diff --git a/CLAUDE.md b/CLAUDE.md index 5e40263..99b9335 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -10,13 +10,17 @@ and lifecycle of its own block in `~/.config/imagen.yaml`. ## Architecture ``` -cmd/imagen/ CLI shell — generate, backends, config, serve +cmd/imagen/ CLI shell — generate, worker, backends, config, serve internal/backend/ Backend interface + Registry + Mock reference impl internal/prompt/ Style preset registry (embedded styles.yaml) internal/output/ Filename templating, image writer, JSON sidecar internal/config/ YAML loader, validation, sample generator +internal/cloud/ Supabase Storage + imagen.images writer +internal/usage/ mai.imagen_usage cost-tracking sink +internal/worker/ imagen.jobs queue consumer (DB-agnostic via Queue interface) internal/server/ HTTP stub (not implemented yet — follow-up issue) -docs/ architecture.md, usage.md +scripts/ imagen-worker.service + env template, ComfyUI scripts +docs/ architecture.md, usage.md, setup-worker-mriver.md ``` Data flow for `imagen generate`: diff --git a/cmd/imagen/generate.go b/cmd/imagen/generate.go index b01cce1..cbd5ad6 100644 --- a/cmd/imagen/generate.go +++ b/cmd/imagen/generate.go @@ -132,9 +132,11 @@ func runGenerate(ctx context.Context, args []string) error { fmt.Fprintln(os.Stderr, "sidecar:", paths.SidecarPath) } - if err := maybeCloudSync(ctx, cfg, noCloud, paths, in, res, w, h); err != nil { + if result, err := maybeCloudSync(ctx, cfg, noCloud, "", paths, in, res, w, h); err != nil { // cloud-sync failures are warnings — the image already wrote. fmt.Fprintln(os.Stderr, "imagen: cloud sync:", err) + } else if result != nil && result.ImageID != "" { + fmt.Fprintf(os.Stderr, "cloud: imagen.images.id=%s storage_path=%s\n", result.ImageID, result.StoragePath) } if err := maybePreview(cfg, previewOn, previewOff, paths.ImagePath, rawPrompt); err != nil { @@ -167,39 +169,45 @@ func resolveCloudSyncMode(cfg *config.Config, noCloudFlag bool, env string) (str } // maybeCloudSync resolves the effective mode and, if it says yes, uploads -// the PNG and inserts the row. Always non-fatal — the image already wrote. -func maybeCloudSync(ctx context.Context, cfg *config.Config, noCloud bool, paths *output.Outputs, in output.Inputs, res *backend.Result, width, height int) error { +// the PNG and inserts the row. Returns the SyncResult on success so callers +// that need the imagen.images.id (e.g. the worker linking a job row) can pick +// it up. ownerOverride, when non-empty, wins over config + env — the worker +// passes the job row's owner_user_id so each job is attributed correctly. +func maybeCloudSync(ctx context.Context, cfg *config.Config, noCloud bool, ownerOverride string, paths *output.Outputs, in output.Inputs, res *backend.Result, width, height int) (*cloud.SyncResult, error) { mode, err := resolveCloudSyncMode(cfg, noCloud, os.Getenv("IMAGEN_CLOUD_SYNC")) if err != nil { - return err + return nil, err } if mode == "off" { - return nil + return nil, nil } sink, ok := cloud.NewFromEnv() if !ok { if mode == "on" { - return fmt.Errorf("cloud_sync=on but SUPABASE_URL / SUPABASE_SERVICE_KEY not set in env") + return nil, fmt.Errorf("cloud_sync=on but SUPABASE_URL / SUPABASE_SERVICE_KEY not set in env") } // auto + missing env = silent skip. - return nil + return nil, nil } - // Config-supplied owner_user_id takes precedence over $IMAGEN_OWNER_USER_ID. - if cfg != nil && cfg.OwnerUserID != "" { + switch { + case ownerOverride != "": + sink.OwnerUserID = ownerOverride + case cfg != nil && cfg.OwnerUserID != "": + // Config-supplied owner_user_id takes precedence over $IMAGEN_OWNER_USER_ID. sink.OwnerUserID = cfg.OwnerUserID } if sink.OwnerUserID == "" { if mode == "on" { - return fmt.Errorf("cloud_sync=on but owner_user_id not set in config and $IMAGEN_OWNER_USER_ID is empty") + return nil, fmt.Errorf("cloud_sync=on but owner_user_id not set in config and $IMAGEN_OWNER_USER_ID is empty") } // auto + missing UUID = silent skip. - return nil + return nil, nil } pngBytes, readErr := os.ReadFile(paths.ImagePath) if readErr != nil { - return fmt.Errorf("read local image: %w", readErr) + return nil, fmt.Errorf("read local image: %w", readErr) } // Reuse the writer's date/slug/seed so storage_path mirrors the local @@ -257,14 +265,7 @@ func maybeCloudSync(ctx context.Context, cfg *config.Config, noCloud bool, paths } syncCtx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() - result, err := sink.Sync(syncCtx, syncReq) - if err != nil { - return err - } - if result != nil && result.ImageID != "" { - fmt.Fprintf(os.Stderr, "cloud: imagen.images.id=%s storage_path=%s\n", result.ImageID, result.StoragePath) - } - return nil + return sink.Sync(syncCtx, syncReq) } func metaString(m map[string]any, key string) string { diff --git a/cmd/imagen/main.go b/cmd/imagen/main.go index e5d227a..3913e44 100644 --- a/cmd/imagen/main.go +++ b/cmd/imagen/main.go @@ -18,6 +18,7 @@ const helpText = `imagen — model-agnostic image generation Usage: imagen generate [flags] generate one image + imagen worker [flags] consume the imagen.jobs queue (daemon) imagen backends list registered backend types imagen config init print a sample imagen.yaml on stdout imagen config validate validate the active config @@ -45,6 +46,8 @@ func main() { switch os.Args[1] { case "generate": err = runGenerate(ctx, args) + case "worker": + err = runWorker(ctx, args) case "backends": err = runBackends(args) case "config": diff --git a/cmd/imagen/worker.go b/cmd/imagen/worker.go new file mode 100644 index 0000000..33ee4e2 --- /dev/null +++ b/cmd/imagen/worker.go @@ -0,0 +1,287 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "os" + "strings" + "time" + + "github.com/jackc/pgx/v5" + + "mgit.msbls.de/m/ImaGen/internal/backend" + "mgit.msbls.de/m/ImaGen/internal/config" + "mgit.msbls.de/m/ImaGen/internal/output" + "mgit.msbls.de/m/ImaGen/internal/prompt" + "mgit.msbls.de/m/ImaGen/internal/worker" +) + +// runWorker is the `imagen worker` subcommand: a long-running daemon that +// consumes the imagen.jobs queue and writes results into imagen.images via +// the same cloud-sync path generate uses. +func runWorker(ctx context.Context, args []string) error { + fs := flag.NewFlagSet("worker", flag.ContinueOnError) + var ( + configPath string + pollInterval time.Duration + jobTimeout time.Duration + ) + fs.StringVar(&configPath, "config", "", "config file path (default: ~/.config/imagen.yaml)") + fs.DurationVar(&pollInterval, "poll-interval", 5*time.Second, "safety-poll cadence between LISTEN wakeups") + fs.DurationVar(&jobTimeout, "job-timeout", 5*time.Minute, "max wall-time per job before the worker marks it failed") + fs.Usage = func() { + fmt.Fprintln(fs.Output(), `Usage: imagen worker [flags] + +Long-running daemon. LISTENs on the Postgres 'imagen_jobs' channel and polls +imagen.jobs every --poll-interval as a safety net, claims pending rows, runs +the generation pipeline, then updates the row with status + image_id. + +Env: + IMAGEN_WORKER_DATABASE_URL Postgres DSN for direct LISTEN + UPDATE. + Required (PostgREST cannot LISTEN). + SUPABASE_URL, SUPABASE_SERVICE_KEY, IMAGEN_OWNER_USER_ID + Reused from generate's cloud-sync path; the + worker writes imagen.images rows through the + same code path. Per-job owner_user_id from the + job row overrides IMAGEN_OWNER_USER_ID.`) + fs.PrintDefaults() + } + if err := fs.Parse(args); err != nil { + return err + } + + cfg, cfgErr := config.Load(configPath) + if cfgErr != nil && !os.IsNotExist(cfgErr) { + return cfgErr + } + + dsn := os.Getenv("IMAGEN_WORKER_DATABASE_URL") + if dsn == "" { + return userErr("IMAGEN_WORKER_DATABASE_URL not set; the worker needs a direct Postgres DSN for LISTEN/NOTIFY") + } + + q, err := dialQueue(ctx, dsn) + if err != nil { + return fmt.Errorf("queue: %w", err) + } + defer q.Close() + + p := &workerPipeline{cfg: cfg} + w := worker.New(q, p, worker.Config{ + PollInterval: pollInterval, + JobTimeout: jobTimeout, + Logger: func(format string, a ...any) { fmt.Fprintf(os.Stderr, format+"\n", a...) }, + }) + fmt.Fprintln(os.Stderr, "imagen worker: ready (poll-interval", pollInterval, "job-timeout", jobTimeout, ")") + return w.Run(ctx) +} + +// pgxQueue is the production Queue. It opens one dedicated connection used +// for both LISTEN (long-lived) and UPDATE operations. A second connection +// would split state needlessly — a single worker process processes one job +// at a time so the connection is never contended. +type pgxQueue struct { + conn *pgx.Conn +} + +func dialQueue(ctx context.Context, dsn string) (*pgxQueue, error) { + conn, err := pgx.Connect(ctx, dsn) + if err != nil { + return nil, fmt.Errorf("pgx.Connect: %w", err) + } + if _, err := conn.Exec(ctx, "LISTEN imagen_jobs"); err != nil { + conn.Close(ctx) + return nil, fmt.Errorf("LISTEN imagen_jobs: %w", err) + } + return &pgxQueue{conn: conn}, nil +} + +func (q *pgxQueue) Close() { + if q == nil || q.conn == nil { + return + } + // Best-effort: a 5s budget is enough to send a polite TerminateMessage. + shutdown, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = q.conn.Close(shutdown) +} + +// ClaimNextPending atomically marks the oldest pending row 'running' and +// returns it. FOR UPDATE SKIP LOCKED is belt + braces against a second worker +// process — out of scope for v1 but cheap insurance. +func (q *pgxQueue) ClaimNextPending(ctx context.Context) (*worker.Job, error) { + const stmt = ` + UPDATE imagen.jobs + SET status='running', started_at=now() + WHERE id = ( + SELECT id FROM imagen.jobs + WHERE status='pending' + ORDER BY created_at + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + RETURNING id, owner_user_id, prompt, backend, + COALESCE(model,''), + COALESCE(width, 0), COALESCE(height, 0), + COALESCE(steps, 0), COALESCE(seed, 0), + COALESCE(style,'')` + var j worker.Job + err := q.conn.QueryRow(ctx, stmt).Scan( + &j.ID, &j.OwnerUserID, &j.Prompt, &j.Backend, + &j.Model, &j.Width, &j.Height, &j.Steps, &j.Seed, &j.Style, + ) + if errors.Is(err, pgx.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + return &j, nil +} + +func (q *pgxQueue) MarkDone(ctx context.Context, jobID, imageID string) error { + _, err := q.conn.Exec(ctx, + `UPDATE imagen.jobs SET status='done', image_id=$2, completed_at=now() WHERE id=$1`, + jobID, imageID) + return err +} + +func (q *pgxQueue) MarkFailed(ctx context.Context, jobID, msg string) error { + // Trim outrageously long error text so a 10MB stack-trace doesn't end up + // in the row (callers see a summary, full text goes to stderr / logs). + const maxLen = 2000 + if len(msg) > maxLen { + msg = msg[:maxLen] + "... [truncated]" + } + _, err := q.conn.Exec(ctx, + `UPDATE imagen.jobs SET status='failed', error=$2, completed_at=now() WHERE id=$1`, + jobID, msg) + return err +} + +// WaitForJob blocks until a NOTIFY arrives on imagen_jobs, the timeout fires, +// or ctx is cancelled. Notifications during a previous processJob are queued +// by pgx and delivered on the next call — we don't lose wake-ups even when +// processing took longer than poll-interval. +func (q *pgxQueue) WaitForJob(ctx context.Context, timeout time.Duration) error { + waitCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + _, err := q.conn.WaitForNotification(waitCtx) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return nil // poll cadence fired + } + if errors.Is(err, context.Canceled) { + return context.Canceled + } + return err + } + return nil +} + +// ResetStaleRunning bumps any rows stuck in 'running' back to 'pending' so +// they get re-claimed. Called once at startup. A row stuck in 'running' came +// from a previous worker crash; without this, flexsiebels would poll +// forever on a job nobody is processing. +func (q *pgxQueue) ResetStaleRunning(ctx context.Context) error { + _, err := q.conn.Exec(ctx, + `UPDATE imagen.jobs SET status='pending', started_at=NULL WHERE status='running'`) + return err +} + +// workerPipeline is the Pipeline implementation that drives a single job +// through buildBackend → prompt enrichment → generate → write disk → +// cloud-sync, then returns the imagen.images.id back to the worker so it +// can link the row. +type workerPipeline struct { + cfg *config.Config +} + +func (p *workerPipeline) Run(ctx context.Context, job worker.Job) worker.Outcome { + if job.OwnerUserID == "" { + return worker.Outcome{Err: fmt.Errorf("job %s: missing owner_user_id", job.ID)} + } + if job.Prompt == "" { + return worker.Outcome{Err: fmt.Errorf("job %s: empty prompt", job.ID)} + } + if job.Backend == "" { + return worker.Outcome{Err: fmt.Errorf("job %s: missing backend", job.ID)} + } + + be, err := buildBackend(p.cfg, job.Backend) + if err != nil { + return worker.Outcome{Err: fmt.Errorf("backend %q: %w", job.Backend, err)} + } + attachUsageSink(be) + + finalPrompt, err := prompt.Apply(job.Prompt, job.Style) + if err != nil { + return worker.Outcome{Err: fmt.Errorf("style: %w", err)} + } + + req := backend.Request{ + Prompt: finalPrompt, + Width: job.Width, + Height: job.Height, + Steps: job.Steps, + Seed: job.Seed, + Style: job.Style, + } + res, err := be.Generate(ctx, req) + if err != nil { + return worker.Outcome{Err: fmt.Errorf("generate: %w", err)} + } + defer res.ImageReader.Close() + + writer := buildWriter(p.cfg, false) + in := output.Inputs{ + Prompt: job.Prompt, + Backend: be.Name(), + Seed: seedFromMetadata(res.Metadata, job.Seed), + Ext: extFromMime(res.MimeType), + Metadata: res.Metadata, + } + paths, err := writer.Write(res.ImageReader, in) + if err != nil { + return worker.Outcome{Err: fmt.Errorf("write disk: %w", err)} + } + + // Worker is queue-driven: cloud-sync is mandatory because flexsiebels + // needs imagen.images.id to render the result. Pass cloud_sync=on via + // the override path (third arg = ownerUserID); we set the mode by + // disallowing the 'off' branch through the cfg later if the user + // explicitly turned it off in config. + if cloudModeOff(p.cfg) { + // We refuse to silently drop a queued job. If cloud sync is off in + // config, the worker can't serve flexsiebels at all. + return worker.Outcome{Err: fmt.Errorf("output.cloud_sync=off in config; the worker requires cloud_sync=on or auto")} + } + syncRes, syncErr := maybeCloudSync(ctx, p.cfg, false, job.OwnerUserID, paths, in, res, dimOrFallback(job.Width, res, "width"), dimOrFallback(job.Height, res, "height")) + if syncErr != nil { + return worker.Outcome{Err: fmt.Errorf("cloud sync: %w", syncErr)} + } + if syncRes == nil || syncRes.ImageID == "" { + return worker.Outcome{Err: fmt.Errorf("cloud sync returned no imagen.images id (check SUPABASE_URL + SUPABASE_SERVICE_KEY)")} + } + return worker.Outcome{ImageID: syncRes.ImageID} +} + +func cloudModeOff(cfg *config.Config) bool { + if cfg == nil { + return false + } + return strings.EqualFold(cfg.Output.CloudSync, "off") +} + +// dimOrFallback returns job. when the job specified one, otherwise the +// dimension reported by the backend's metadata. Some backends (Replicate +// when given an aspect ratio) round the requested size to their nearest +// supported value; this keeps the row honest about what was actually generated. +func dimOrFallback(jobDim int, res *backend.Result, key string) int { + if jobDim > 0 { + return jobDim + } + return metaInt(res.Metadata, key) +} diff --git a/cmd/imagen/worker_integration_test.go b/cmd/imagen/worker_integration_test.go new file mode 100644 index 0000000..e3f161d --- /dev/null +++ b/cmd/imagen/worker_integration_test.go @@ -0,0 +1,129 @@ +package main + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/jackc/pgx/v5" + + "mgit.msbls.de/m/ImaGen/internal/config" + "mgit.msbls.de/m/ImaGen/internal/worker" +) + +// TestWorker_Integration_EndToEnd runs the full pipeline against a real +// msupabase instance: insert a row into imagen.jobs, let the worker claim +// it, generate via the mock backend (no Replicate spend, no ComfyUI +// dependency), write to Supabase Storage + imagen.images, then flip the job +// to 'done' with the linked image_id. +// +// Guarded by IMAGEN_WORKER_INTEGRATION=1. Required env beyond that: +// +// IMAGEN_WORKER_DATABASE_URL postgres DSN (direct, not PostgREST) +// SUPABASE_URL e.g. https://supa.flexsiebels.de +// SUPABASE_SERVICE_KEY service-role JWT +// IMAGEN_OWNER_USER_ID UUID of an auth.users row (RLS fallback) +// +// The test creates and later deletes its own job row so repeated runs don't +// leave debris. +func TestWorker_Integration_EndToEnd(t *testing.T) { + if os.Getenv("IMAGEN_WORKER_INTEGRATION") != "1" { + t.Skip("set IMAGEN_WORKER_INTEGRATION=1 to run the integration test") + } + dsn := os.Getenv("IMAGEN_WORKER_DATABASE_URL") + if dsn == "" { + t.Fatal("IMAGEN_WORKER_DATABASE_URL must be set for the integration test") + } + if os.Getenv("SUPABASE_URL") == "" || os.Getenv("SUPABASE_SERVICE_KEY") == "" { + t.Fatal("SUPABASE_URL and SUPABASE_SERVICE_KEY must be set for the integration test") + } + owner := os.Getenv("IMAGEN_OWNER_USER_ID") + if owner == "" { + t.Fatal("IMAGEN_OWNER_USER_ID must be set for the integration test") + } + + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + q, err := dialQueue(ctx, dsn) + if err != nil { + t.Fatalf("dialQueue: %v", err) + } + defer q.Close() + + // Insert the test job on a separate connection (the worker's conn is + // busy LISTENing). Mock backend = no external dependency. + insertConn, err := pgx.Connect(ctx, dsn) + if err != nil { + t.Fatalf("insert conn: %v", err) + } + defer insertConn.Close(ctx) + + var jobID string + prompt := fmt.Sprintf("imagen integration test %d", time.Now().UnixNano()) + err = insertConn.QueryRow(ctx, ` + INSERT INTO imagen.jobs (owner_user_id, prompt, backend, width, height) + VALUES ($1, $2, 'mock', 64, 64) + RETURNING id`, + owner, prompt).Scan(&jobID) + if err != nil { + t.Fatalf("insert job: %v", err) + } + t.Logf("inserted imagen.jobs id=%s", jobID) + // Tidy up at the end of the test so a re-run starts clean. + defer func() { + cleanup, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, _ = insertConn.Exec(cleanup, `DELETE FROM imagen.jobs WHERE id=$1`, jobID) + }() + + // Use a per-test temp dir so the generated PNG doesn't litter the repo. + tmpDir := t.TempDir() + cfg := &config.Config{Output: config.OutputConfig{Directory: tmpDir}} + p := &workerPipeline{cfg: cfg} + w := worker.New(q, p, worker.Config{ + PollInterval: 1 * time.Second, + JobTimeout: 30 * time.Second, + Logger: func(format string, a ...any) { t.Logf("worker: "+format, a...) }, + }) + + // Run the worker until it processes one job (the one we just inserted) + // or the test context times out. + runCtx, runCancel := context.WithCancel(ctx) + done := make(chan struct{}) + go func() { + _ = w.Run(runCtx) + close(done) + }() + + // Poll for completion. + deadline := time.Now().Add(60 * time.Second) + var status, imageID string + for time.Now().Before(deadline) { + err = insertConn.QueryRow(ctx, + `SELECT status, COALESCE(image_id::text,'') FROM imagen.jobs WHERE id=$1`, + jobID).Scan(&status, &imageID) + if err != nil { + t.Fatalf("poll: %v", err) + } + if status == "done" || status == "failed" { + break + } + time.Sleep(500 * time.Millisecond) + } + runCancel() + <-done + + if status != "done" { + var errText string + _ = insertConn.QueryRow(ctx, + `SELECT COALESCE(error,'') FROM imagen.jobs WHERE id=$1`, jobID).Scan(&errText) + t.Fatalf("job not done within timeout: status=%q error=%q", status, errText) + } + if imageID == "" { + t.Fatalf("job done but image_id is empty") + } + t.Logf("job done: image_id=%s", imageID) +} diff --git a/docs/architecture.md b/docs/architecture.md index 3510c3d..726f552 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -7,7 +7,7 @@ upstream API. Each adapter only ever sees its own slice of `imagen.yaml`. ``` ┌───────────────────────┐ - │ cmd/imagen │ CLI dispatch + │ cmd/imagen │ CLI dispatch (generate / worker / …) │ (or HTTP server) │ └──────────┬────────────┘ │ @@ -18,6 +18,7 @@ upstream API. Each adapter only ever sees its own slice of `imagen.yaml`. │ internal/preview │ tmux-img window spawner │ internal/cloud │ Supabase Storage + imagen.images │ internal/usage │ mai.imagen_usage cost-tracking + │ internal/worker │ imagen.jobs queue consumer └──────────┬────────────┘ │ ┌──────────▼────────────┐ @@ -105,9 +106,37 @@ contains the prompt, backend instance name, seed, ISO timestamp, and the - Network errors during `Generate` — wrap and return; no retry policy yet (decide per-adapter, or move to a shared retry helper if a pattern emerges). +## Async write path: `imagen worker` + `imagen.jobs` + +`imagen generate` is the synchronous CLI. For web callers (flexsiebels' +owner-mode UI) `cmd/imagen worker` runs as a daemon that consumes the +`imagen.jobs` table. + +``` +flexsiebels POST imagen worker (mRiver, systemd) + → INSERT INTO LISTEN imagen_jobs ◄── pg_notify trigger + imagen.jobs(pending) claim row (UPDATE … RETURNING) + dispatch through internal/backend + write disk + cloud-sync via internal/cloud + UPDATE imagen.jobs SET status='done', image_id=… +``` + +The queue table lives next to `imagen.images` in the same `imagen` schema. +Owner-scoped RLS lets the flexsiebels user INSERT + read their own rows; +the worker writes (status updates + image_id link) via service-role which +bypasses RLS. A 5-second safety poll fires on every wake-up to cover +dropped NOTIFY events and worker cold starts with a non-empty queue. See +`docs/setup-worker-mriver.md` for the systemd installation. + +The worker reuses `internal/backend`, `internal/output`, and +`internal/cloud` unchanged — it is purely an orchestration layer around +the same pipeline `imagen generate` drives. + ## Out of scope (today) - Image post-processing (cropping, watermarking). -- Cost-tracking (lands with the Replicate adapter, since only API backends bill). - Multi-image `n>1` per request — backends that support it can expose it via `BackendOpts`; the framework doesn't have a first-class field yet. +- Job cancellation / kill switch — separate follow-up issue. +- Concurrent workers / multi-host scale-out — `FOR UPDATE SKIP LOCKED` in + the claim query makes it cheap to add, but a single worker is the v1 setup. diff --git a/docs/setup-worker-mriver.md b/docs/setup-worker-mriver.md new file mode 100644 index 0000000..30b484e --- /dev/null +++ b/docs/setup-worker-mriver.md @@ -0,0 +1,97 @@ +# `imagen worker` on mRiver + +The worker is a long-running daemon that consumes the `imagen.jobs` queue +(written by flexsiebels' owner-mode UI) and writes the resulting image to +Supabase Storage + `imagen.images` via the same cloud-sync path the CLI +`imagen generate` uses. + +## Architecture + +``` +flexsiebels (owner UI) + | + v INSERT INTO imagen.jobs (...) + | + msupabase Postgres + | + | AFTER INSERT trigger: + | pg_notify('imagen_jobs', NEW.id) + v + imagen worker (mRiver) ── LISTEN imagen_jobs + | + | 1. claim oldest 'pending' row (status='running') + | 2. dispatch to backend (FLUX schnell local / FLUX dev replicate / …) + | 3. write PNG to disk + | 4. upload to Storage + INSERT into imagen.images + | 5. UPDATE imagen.jobs SET status='done', image_id=... + v + flexsiebels polls GET .../jobs/ → renders the rendered card +``` + +A 5-second safety poll covers dropped NOTIFY events and worker cold starts +with a non-empty queue. + +## One-time setup + +```bash +# 1. Build the binary (or `task build`). +cd ~/dev/ImaGen +go build -o bin/imagen ./cmd/imagen + +# 2. Write the environment file. +cp scripts/imagen-worker.env.example ~/.dotfiles/.env.imagen-worker +chmod 600 ~/.dotfiles/.env.imagen-worker +$EDITOR ~/.dotfiles/.env.imagen-worker # fill in real DSN, service key + +# 3. Install the user systemd unit. +mkdir -p ~/.config/systemd/user +cp scripts/imagen-worker.service ~/.config/systemd/user/imagen-worker.service +systemctl --user daemon-reload +systemctl --user enable --now imagen-worker.service + +# 4. Tail the logs. +journalctl --user -u imagen-worker -f +``` + +## Required env vars + +See `scripts/imagen-worker.env.example` for the canonical list. Required: + +- `IMAGEN_WORKER_DATABASE_URL` — direct Postgres DSN. PostgREST cannot LISTEN. +- `SUPABASE_URL`, `SUPABASE_SERVICE_KEY` — same pair `imagen generate` + reads for the cloud-sync writer. +- `IMAGEN_OWNER_USER_ID` — fallback owner UUID; per-job row's + `owner_user_id` overrides this. + +Optional, depending on enabled backends: + +- `REPLICATE_API_TOKEN` if any job will request a Replicate-typed backend. + +## Operating + +```bash +systemctl --user status imagen-worker # health +systemctl --user restart imagen-worker # pick up a new binary +journalctl --user -u imagen-worker -n 200 # recent log lines +``` + +On startup the worker calls `ResetStaleRunning` once, flipping any rows +left in `'running'` from a previous crash back to `'pending'` so they get +re-claimed by the 5-second poll. + +## Smoke test + +With the worker running, INSERT a test job: + +```sql +INSERT INTO imagen.jobs (owner_user_id, prompt, backend, width, height) +VALUES ( + 'ac6c9501-3757-4a6d-8b97-2cff4288382b', + 'a tiny owl wearing wire-rim glasses, photo', + 'flux-schnell-local', 1024, 1024 +); +``` + +Within ~10 seconds the row should show `status='done'`, a populated +`image_id` linking to a real `imagen.images` row, and a Storage object at +`/-.png` in the `imagen-generated` bucket. diff --git a/go.mod b/go.mod index 7c84c19..5cd50ab 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,16 @@ module mgit.msbls.de/m/ImaGen -go 1.24 +go 1.25.0 -require gopkg.in/yaml.v3 v3.0.1 +require ( + github.com/jackc/pgx/v5 v5.9.2 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect + golang.org/x/text v0.29.0 // indirect +) diff --git a/go.sum b/go.sum index a62c313..7af349d 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,35 @@ -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.9.2 h1:3ZhOzMWnR4yJ+RW1XImIPsD1aNSz4T4fyP7zlQb56hw= +github.com/jackc/pgx/v5 v5.9.2/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/worker/worker.go b/internal/worker/worker.go new file mode 100644 index 0000000..4474994 --- /dev/null +++ b/internal/worker/worker.go @@ -0,0 +1,213 @@ +// Package worker consumes the imagen.jobs queue. It claims pending rows via +// an UPDATE-returning lock (single source of truth, no double-claim window), +// runs the supplied generation pipeline, then writes status + image_id back. +// +// The package is DB-agnostic: it talks to two small interfaces (Queue + +// Pipeline) so unit tests can drive the claim/transition logic with no real +// Postgres connection. cmd/imagen wires the pgx implementation. +package worker + +import ( + "context" + "errors" + "fmt" + "sync" + "time" +) + +// Job is the slice of an imagen.jobs row the worker needs to drive a +// generation. Null columns from the DB are represented as zero values; the +// pipeline treats zero values as "use backend default" (same convention as +// backend.Request). +type Job struct { + ID string + OwnerUserID string + Prompt string + Backend string + Model string + Width int + Height int + Steps int + Seed int64 + Style string +} + +// Outcome is what the pipeline reports back per job. ImageID is the +// imagen.images.id the cloud-sync produced. Empty ImageID with nil Err means +// the cloud-sync was skipped (config off) — we treat that as a failure for +// the worker since flexsiebels needs the image_id to render the result. +type Outcome struct { + ImageID string + Err error +} + +// Queue is the persistence layer for the imagen.jobs table. Implementations +// must be safe for serialised single-worker use (concurrent claim across +// multiple worker processes is out of scope for v1 — the FOR UPDATE SKIP +// LOCKED clause in the pgx claim query covers it cheaply anyway). +type Queue interface { + // ClaimNextPending atomically marks the oldest pending row 'running' and + // returns it. Returns (nil, nil) when the queue is empty. + ClaimNextPending(ctx context.Context) (*Job, error) + // MarkDone records success: status='done', image_id, completed_at=now(). + MarkDone(ctx context.Context, jobID, imageID string) error + // MarkFailed records failure: status='failed', error=msg, completed_at=now(). + MarkFailed(ctx context.Context, jobID, errMsg string) error + // WaitForJob blocks until either a NOTIFY arrives on imagen_jobs, the + // timeout expires, or ctx is cancelled. Returns nil on notification or + // timeout; returns ctx.Err() on cancellation. Transient connection errors + // are returned so the caller can decide to reconnect. + WaitForJob(ctx context.Context, timeout time.Duration) error + // ResetStaleRunning marks any rows stuck in 'running' (e.g. left over + // from a crash before this process started) back to 'pending'. Called + // once at worker startup so the cold-start safety poll can pick them up. + ResetStaleRunning(ctx context.Context) error +} + +// Pipeline runs one generation and reports back the imagen.images.id (or an +// error). The implementation owns backend dispatch, prompt enrichment, disk +// write, and cloud-sync; the worker only orchestrates queue state. +type Pipeline interface { + Run(ctx context.Context, job Job) Outcome +} + +// Config is the runtime knob set for the worker loop. +type Config struct { + // PollInterval is the safety-poll cadence between LISTEN wakeups. Picking + // this too low wastes DB roundtrips; too high lets a dropped NOTIFY + // stall the queue. 5s is the spec'd default. + PollInterval time.Duration + // JobTimeout caps any single Pipeline.Run. A backend hang shouldn't + // freeze the queue forever. + JobTimeout time.Duration + // Logger receives one-line status events. nil means silent. + Logger func(format string, args ...any) +} + +// Worker is the orchestration loop. It is not reusable across Run calls. +type Worker struct { + q Queue + p Pipeline + cfg Config + + // processingMu guards the in-flight job so SIGTERM-triggered shutdown + // waits for it to complete before returning. + processingMu sync.Mutex +} + +// New constructs a Worker. +func New(q Queue, p Pipeline, cfg Config) *Worker { + if cfg.PollInterval <= 0 { + cfg.PollInterval = 5 * time.Second + } + if cfg.JobTimeout <= 0 { + cfg.JobTimeout = 5 * time.Minute + } + return &Worker{q: q, p: p, cfg: cfg} +} + +// Run drives the consume loop until ctx is cancelled or a fatal queue error +// (e.g. unrecoverable DB drop) is returned. A LISTEN wait can fail with a +// transient transport error; the worker logs and continues so a temporary +// network blip doesn't take it down. +func (w *Worker) Run(ctx context.Context) error { + if err := w.q.ResetStaleRunning(ctx); err != nil { + w.log("worker: reset stale running rows: %v", err) + // Don't return — a stale row will eventually be visible to the poll + // path once flexsiebels gives up and resubmits, and we'd rather keep + // serving fresh jobs than crash here. + } + for { + if err := ctx.Err(); err != nil { + return nil + } + // Drain the queue: claim and process until empty. + if err := w.drain(ctx); err != nil && !errors.Is(err, context.Canceled) { + w.log("worker: drain: %v", err) + } + if err := ctx.Err(); err != nil { + return nil + } + // Wait for the next wake. WaitForJob covers both LISTEN and the + // timeout-based poll fallback; either returns nil and we loop. + if err := w.q.WaitForJob(ctx, w.cfg.PollInterval); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + w.log("worker: wait: %v (continuing)", err) + // Pace the retries so a totally-broken DB doesn't busy-spin. + select { + case <-ctx.Done(): + return nil + case <-time.After(w.cfg.PollInterval): + } + } + } +} + +// drain claims and processes every currently-pending job. The job-scoped +// context is derived from context.Background() so that a SIGTERM mid-job +// still lets the pipeline finish — that's the "no half-state on shutdown" +// guarantee the issue calls for. +func (w *Worker) drain(ctx context.Context) error { + for { + if err := ctx.Err(); err != nil { + return err + } + job, err := w.q.ClaimNextPending(ctx) + if err != nil { + return fmt.Errorf("claim: %w", err) + } + if job == nil { + return nil + } + w.processOne(*job) + } +} + +// processOne runs the pipeline for one already-claimed job and writes the +// outcome back to the queue. The job context is independent of the outer +// ctx so an in-flight job can finish even after SIGTERM. +func (w *Worker) processOne(job Job) { + w.processingMu.Lock() + defer w.processingMu.Unlock() + + w.log("worker: processing job %s backend=%s", job.ID, job.Backend) + jobCtx, cancel := context.WithTimeout(context.Background(), w.cfg.JobTimeout) + defer cancel() + out := w.p.Run(jobCtx, job) + + // Status-update uses Background ctx with a short timeout — we must + // always be able to record the outcome, otherwise the row sits in + // 'running' forever. + updCtx, updCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer updCancel() + if out.Err != nil { + w.log("worker: job %s failed: %v", job.ID, out.Err) + if err := w.q.MarkFailed(updCtx, job.ID, out.Err.Error()); err != nil { + w.log("worker: mark failed for %s: %v", job.ID, err) + } + return + } + if out.ImageID == "" { + // Pipeline reported success but no imagen.images row — treat as + // failure because flexsiebels has nothing to link. + const msg = "pipeline did not return an imagen.images id (cloud sync misconfigured?)" + w.log("worker: job %s: %s", job.ID, msg) + if err := w.q.MarkFailed(updCtx, job.ID, msg); err != nil { + w.log("worker: mark failed for %s: %v", job.ID, err) + } + return + } + if err := w.q.MarkDone(updCtx, job.ID, out.ImageID); err != nil { + w.log("worker: mark done for %s: %v", job.ID, err) + return + } + w.log("worker: job %s done image_id=%s", job.ID, out.ImageID) +} + +func (w *Worker) log(format string, args ...any) { + if w.cfg.Logger != nil { + w.cfg.Logger(format, args...) + } +} diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go new file mode 100644 index 0000000..0bcec30 --- /dev/null +++ b/internal/worker/worker_test.go @@ -0,0 +1,332 @@ +package worker + +import ( + "context" + "errors" + "fmt" + "sync" + "testing" + "time" +) + +// fakeQueue is a hand-rolled in-memory queue that mirrors the contract of a +// real Postgres-backed implementation: ClaimNextPending atomically takes one +// pending row and flips its status to "running", MarkDone/MarkFailed are +// idempotent terminal transitions, WaitForJob blocks until notified or until +// the timeout elapses. +type fakeQueue struct { + mu sync.Mutex + pending []Job + state map[string]string // jobID -> status + last map[string]string // jobID -> error msg or image_id + notify chan struct{} + + claimErr error + doneErr error + failErr error + resetErr error + + claimed int + done int + failed int + resets int +} + +func newFakeQueue(jobs ...Job) *fakeQueue { + q := &fakeQueue{ + state: make(map[string]string), + last: make(map[string]string), + notify: make(chan struct{}, 16), + } + for _, j := range jobs { + q.pending = append(q.pending, j) + q.state[j.ID] = "pending" + } + return q +} + +func (q *fakeQueue) ClaimNextPending(ctx context.Context) (*Job, error) { + q.mu.Lock() + defer q.mu.Unlock() + if q.claimErr != nil { + return nil, q.claimErr + } + if len(q.pending) == 0 { + return nil, nil + } + j := q.pending[0] + q.pending = q.pending[1:] + q.state[j.ID] = "running" + q.claimed++ + return &j, nil +} + +func (q *fakeQueue) MarkDone(ctx context.Context, jobID, imageID string) error { + q.mu.Lock() + defer q.mu.Unlock() + if q.doneErr != nil { + return q.doneErr + } + q.state[jobID] = "done" + q.last[jobID] = imageID + q.done++ + return nil +} + +func (q *fakeQueue) MarkFailed(ctx context.Context, jobID, msg string) error { + q.mu.Lock() + defer q.mu.Unlock() + if q.failErr != nil { + return q.failErr + } + q.state[jobID] = "failed" + q.last[jobID] = msg + q.failed++ + return nil +} + +func (q *fakeQueue) WaitForJob(ctx context.Context, timeout time.Duration) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-q.notify: + return nil + case <-time.After(timeout): + return nil + } +} + +func (q *fakeQueue) ResetStaleRunning(ctx context.Context) error { + q.mu.Lock() + defer q.mu.Unlock() + q.resets++ + return q.resetErr +} + +// pingNotify simulates an INSERT-trigger NOTIFY by waking WaitForJob. +func (q *fakeQueue) pingNotify() { + select { + case q.notify <- struct{}{}: + default: + } +} + +// stub pipeline. +type fakePipeline struct { + mu sync.Mutex + results map[string]Outcome // by job.ID; "" key = default outcome + calls int + delay time.Duration + lastJob Job +} + +func (p *fakePipeline) Run(ctx context.Context, job Job) Outcome { + p.mu.Lock() + p.calls++ + p.lastJob = job + delay := p.delay + out, ok := p.results[job.ID] + if !ok { + out = p.results[""] + } + p.mu.Unlock() + if delay > 0 { + select { + case <-ctx.Done(): + return Outcome{Err: ctx.Err()} + case <-time.After(delay): + } + } + return out +} + +func TestWorker_DonePath(t *testing.T) { + q := newFakeQueue( + Job{ID: "j1", Prompt: "a", Backend: "mock"}, + ) + p := &fakePipeline{results: map[string]Outcome{"j1": {ImageID: "img-1"}}} + w := New(q, p, Config{PollInterval: 10 * time.Millisecond, JobTimeout: time.Second}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(80 * time.Millisecond) + cancel() + }() + if err := w.Run(ctx); err != nil { + t.Fatalf("Run: %v", err) + } + if got := q.state["j1"]; got != "done" { + t.Fatalf("state=%q want done", got) + } + if got := q.last["j1"]; got != "img-1" { + t.Fatalf("image_id=%q want img-1", got) + } + if q.done != 1 || q.failed != 0 { + t.Fatalf("counts: done=%d failed=%d", q.done, q.failed) + } + if p.calls != 1 { + t.Fatalf("pipeline calls=%d want 1", p.calls) + } + if q.resets != 1 { + t.Fatalf("ResetStaleRunning calls=%d want 1", q.resets) + } +} + +func TestWorker_FailedPath_RecordsErrorText(t *testing.T) { + q := newFakeQueue(Job{ID: "j1", Prompt: "a", Backend: "mock"}) + p := &fakePipeline{results: map[string]Outcome{"j1": {Err: errors.New("backend unreachable")}}} + w := New(q, p, Config{PollInterval: 10 * time.Millisecond, JobTimeout: time.Second}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { time.Sleep(80 * time.Millisecond); cancel() }() + _ = w.Run(ctx) + + if got := q.state["j1"]; got != "failed" { + t.Fatalf("state=%q want failed", got) + } + if got := q.last["j1"]; got != "backend unreachable" { + t.Fatalf("error=%q want %q", got, "backend unreachable") + } + if q.done != 0 || q.failed != 1 { + t.Fatalf("counts: done=%d failed=%d", q.done, q.failed) + } +} + +func TestWorker_MissingImageID_TreatedAsFailure(t *testing.T) { + q := newFakeQueue(Job{ID: "j1", Prompt: "a", Backend: "mock"}) + // Outcome has neither Err nor ImageID — pipeline silently swallowed + // cloud-sync. flexsiebels needs the image_id; without it, fail the job. + p := &fakePipeline{results: map[string]Outcome{"j1": {}}} + w := New(q, p, Config{PollInterval: 10 * time.Millisecond, JobTimeout: time.Second}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { time.Sleep(80 * time.Millisecond); cancel() }() + _ = w.Run(ctx) + + if got := q.state["j1"]; got != "failed" { + t.Fatalf("state=%q want failed", got) + } + if q.last["j1"] == "" { + t.Fatalf("expected non-empty error explanation for missing image_id") + } +} + +func TestWorker_DrainsMultipleBeforeWaiting(t *testing.T) { + q := newFakeQueue( + Job{ID: "j1", Backend: "mock"}, + Job{ID: "j2", Backend: "mock"}, + Job{ID: "j3", Backend: "mock"}, + ) + p := &fakePipeline{results: map[string]Outcome{"": {ImageID: "img"}}} + w := New(q, p, Config{PollInterval: 200 * time.Millisecond, JobTimeout: time.Second}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { time.Sleep(60 * time.Millisecond); cancel() }() + _ = w.Run(ctx) + + for _, id := range []string{"j1", "j2", "j3"} { + if got := q.state[id]; got != "done" { + t.Fatalf("%s state=%q want done", id, got) + } + } + if q.done != 3 { + t.Fatalf("done=%d want 3", q.done) + } +} + +func TestWorker_NotifyWakesEarlierThanPoll(t *testing.T) { + q := newFakeQueue() + p := &fakePipeline{results: map[string]Outcome{"": {ImageID: "img"}}} + // Set poll interval high so a working LISTEN is required to see the job + // promptly. Without NOTIFY plumbing this test would time out the worker + // before drain ever runs. + w := New(q, p, Config{PollInterval: 5 * time.Second, JobTimeout: time.Second}) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan struct{}) + go func() { + _ = w.Run(ctx) + close(done) + }() + // Append a job and ping the wake channel. + q.mu.Lock() + q.pending = append(q.pending, Job{ID: "late", Backend: "mock"}) + q.state["late"] = "pending" + q.mu.Unlock() + q.pingNotify() + + // Give the worker a beat to claim + process. + deadline := time.Now().Add(500 * time.Millisecond) + for time.Now().Before(deadline) { + q.mu.Lock() + s := q.state["late"] + q.mu.Unlock() + if s == "done" { + cancel() + <-done + return + } + time.Sleep(5 * time.Millisecond) + } + t.Fatalf("worker did not pick up the late job within the 500ms window — NOTIFY wake-up path is broken") +} + +func TestWorker_HonoursContextCancellation(t *testing.T) { + q := newFakeQueue() + p := &fakePipeline{results: map[string]Outcome{"": {ImageID: "img"}}} + w := New(q, p, Config{PollInterval: 10 * time.Millisecond, JobTimeout: time.Second}) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond) + defer cancel() + start := time.Now() + if err := w.Run(ctx); err != nil { + t.Fatalf("Run: %v", err) + } + if dur := time.Since(start); dur > 200*time.Millisecond { + t.Fatalf("worker did not exit promptly on ctx cancel: %v", dur) + } +} + +func TestWorker_InflightJobFinishesAfterShutdown(t *testing.T) { + q := newFakeQueue(Job{ID: "long", Backend: "mock"}) + p := &fakePipeline{ + results: map[string]Outcome{"long": {ImageID: "img-long"}}, + delay: 120 * time.Millisecond, + } + // Short JobTimeout would also kill the in-flight job; give it enough + // budget so the test exercises the shutdown-during-job path. + w := New(q, p, Config{PollInterval: 10 * time.Millisecond, JobTimeout: 5 * time.Second}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + // Let the job start, then cancel mid-flight. + time.Sleep(30 * time.Millisecond) + cancel() + }() + _ = w.Run(ctx) + if got := q.state["long"]; got != "done" { + t.Fatalf("state=%q want done (in-flight job should finish even on shutdown)", got) + } +} + +func TestWorker_TransientClaimErrorDoesNotKillLoop(t *testing.T) { + // First claim returns an error; the loop should log and try again on the + // next wake — it must not propagate the error and exit. + q := newFakeQueue(Job{ID: "j1", Backend: "mock"}) + q.claimErr = fmt.Errorf("transient: connection reset") + p := &fakePipeline{results: map[string]Outcome{"j1": {ImageID: "img"}}} + w := New(q, p, Config{PollInterval: 20 * time.Millisecond, JobTimeout: time.Second}) + ctx, cancel := context.WithCancel(context.Background()) + + // Heal the claim error after a beat so the second drain succeeds. + go func() { + time.Sleep(40 * time.Millisecond) + q.mu.Lock() + q.claimErr = nil + q.mu.Unlock() + }() + go func() { + time.Sleep(200 * time.Millisecond) + cancel() + }() + if err := w.Run(ctx); err != nil { + t.Fatalf("Run returned: %v (transient claim errors should not kill the loop)", err) + } + if got := q.state["j1"]; got != "done" { + t.Fatalf("state=%q want done", got) + } +} diff --git a/scripts/imagen-worker.env.example b/scripts/imagen-worker.env.example new file mode 100644 index 0000000..c3c738d --- /dev/null +++ b/scripts/imagen-worker.env.example @@ -0,0 +1,22 @@ +# Environment for the imagen-worker.service systemd unit. +# Copy to ~/.dotfiles/.env.imagen-worker and fill in real values. +# Never commit the populated file — it carries the Supabase service-role key. + +# Direct Postgres DSN for LISTEN/NOTIFY + imagen.jobs UPDATE statements. +# PostgREST cannot LISTEN, so the worker connects to Postgres directly. +# Host + port + password come from the msupabase compose env on mlake. +IMAGEN_WORKER_DATABASE_URL=postgres://postgres:CHANGE_ME@100.99.98.201:6789/postgres?sslmode=disable + +# PostgREST endpoint for the imagen.images cloud-sync writer (same as +# `imagen generate`'s cloud-sync code path). +SUPABASE_URL=https://supa.flexsiebels.de +SUPABASE_SERVICE_KEY=CHANGE_ME + +# Default owner_user_id. Per-job owner from the imagen.jobs row overrides +# this, so it's only used as a fallback when a job arrives with a NULL +# owner_user_id — which the schema disallows. Keep it set for safety. +IMAGEN_OWNER_USER_ID=ac6c9501-3757-4a6d-8b97-2cff4288382b + +# Optional: REPLICATE_API_TOKEN if any imagen.jobs.backend may resolve to +# a Replicate adapter instance. +# REPLICATE_API_TOKEN=CHANGE_ME diff --git a/scripts/imagen-worker.service b/scripts/imagen-worker.service new file mode 100644 index 0000000..0c3f809 --- /dev/null +++ b/scripts/imagen-worker.service @@ -0,0 +1,19 @@ +[Unit] +Description=ImaGen worker (consumes imagen.jobs queue) +Documentation=https://mgit.msbls.de/m/ImaGen/issues/8 +Wants=network-online.target +After=network-online.target + +[Service] +Type=simple +ExecStart=%h/dev/ImaGen/bin/imagen worker +WorkingDirectory=%h/dev/ImaGen +EnvironmentFile=%h/.dotfiles/.env.imagen-worker +Restart=on-failure +RestartSec=5 +# Give the worker time to finish an in-flight generation on shutdown +# (FLUX dev up to ~30s, plus the cloud-sync write-back). +TimeoutStopSec=60 + +[Install] +WantedBy=default.target