Files
ImaGen/docs/setup-worker-mriver.md
mAi 2758c5a500 mAi: #8 - imagen.jobs queue + worker subcommand (flexsiebels write path)
Async write path for the flexsiebels owner-mode UI: flexsiebels INSERTs into
imagen.jobs, the worker on mRiver claims pending rows via LISTEN/NOTIFY +
5s safety poll, runs the same generate pipeline imagen generate uses, and
writes the result through internal/cloud into imagen.images.

- Schema migration imagen_jobs_init: table + status CHECK + two indexes +
  owner-scoped RLS + grants + AFTER INSERT trigger publishing on the
  imagen_jobs channel via pg_notify.
- internal/worker: DB-agnostic loop over a Queue interface. Drains the
  whole pending backlog on each wake. Job-scoped contexts are derived
  from Background so SIGTERM lets the in-flight generation finish (no
  half-state). ResetStaleRunning at startup unsticks rows left over from
  a previous crash. Eight unit tests cover the done / failed / missing-id /
  drain / NOTIFY-wake / shutdown / transient-error paths against a fake
  queue (no real Postgres in CI).
- cmd/imagen/worker.go: pgx-backed Queue (one dedicated conn for LISTEN +
  UPDATE), plus the workerPipeline that reuses buildBackend +
  attachUsageSink + prompt.Apply + buildWriter + maybeCloudSync. The
  per-job owner_user_id overrides the env-level fallback so each row in
  imagen.images is attributed correctly.
- maybeCloudSync now returns (*cloud.SyncResult, error) so the worker can
  link imagen.jobs.image_id to the inserted imagen.images row. The CLI
  generate path keeps printing its stderr summary unchanged.
- scripts/imagen-worker.service + .env.example for the systemd --user unit
  on mRiver. EnvironmentFile lives in ~/.dotfiles and is never committed.
- docs/setup-worker-mriver.md walks through installation + the spec's
  SQL-INSERT smoke; docs/architecture.md grows an "async write path"
  section.
- worker_integration_test.go (env-guarded by IMAGEN_WORKER_INTEGRATION=1)
  drives one real job through the full pipeline against msupabase using
  the mock backend, then verifies imagen.images + Storage object landed
  and the row flipped to done with image_id linked. Verified end-to-end:
  pickup latency ~7ms, total 74ms, failure path captures error text.
2026-05-11 10:23:33 +02:00

3.0 KiB

imagen worker on mRiver

The worker is a long-running daemon that consumes the imagen.jobs queue (written by flexsiebels' owner-mode UI) and writes the resulting image to Supabase Storage + imagen.images via the same cloud-sync path the CLI imagen generate uses.

Architecture

flexsiebels (owner UI)
        |
        v  INSERT INTO imagen.jobs (...)
        |
   msupabase Postgres
        |
        |   AFTER INSERT trigger:
        |   pg_notify('imagen_jobs', NEW.id)
        v
   imagen worker (mRiver) ── LISTEN imagen_jobs
        |
        |   1. claim oldest 'pending' row (status='running')
        |   2. dispatch to backend (FLUX schnell local / FLUX dev replicate / …)
        |   3. write PNG to disk
        |   4. upload to Storage + INSERT into imagen.images
        |   5. UPDATE imagen.jobs SET status='done', image_id=...
        v
   flexsiebels polls GET .../jobs/<id> → renders the rendered card

A 5-second safety poll covers dropped NOTIFY events and worker cold starts with a non-empty queue.

One-time setup

# 1. Build the binary (or `task build`).
cd ~/dev/ImaGen
go build -o bin/imagen ./cmd/imagen

# 2. Write the environment file.
cp scripts/imagen-worker.env.example ~/.dotfiles/.env.imagen-worker
chmod 600 ~/.dotfiles/.env.imagen-worker
$EDITOR ~/.dotfiles/.env.imagen-worker   # fill in real DSN, service key

# 3. Install the user systemd unit.
mkdir -p ~/.config/systemd/user
cp scripts/imagen-worker.service ~/.config/systemd/user/imagen-worker.service
systemctl --user daemon-reload
systemctl --user enable --now imagen-worker.service

# 4. Tail the logs.
journalctl --user -u imagen-worker -f

Required env vars

See scripts/imagen-worker.env.example for the canonical list. Required:

  • IMAGEN_WORKER_DATABASE_URL — direct Postgres DSN. PostgREST cannot LISTEN.
  • SUPABASE_URL, SUPABASE_SERVICE_KEY — same pair imagen generate reads for the cloud-sync writer.
  • IMAGEN_OWNER_USER_ID — fallback owner UUID; per-job row's owner_user_id overrides this.

Optional, depending on enabled backends:

  • REPLICATE_API_TOKEN if any job will request a Replicate-typed backend.

Operating

systemctl --user status imagen-worker          # health
systemctl --user restart imagen-worker         # pick up a new binary
journalctl --user -u imagen-worker -n 200      # recent log lines

On startup the worker calls ResetStaleRunning once, flipping any rows left in 'running' from a previous crash back to 'pending' so they get re-claimed by the 5-second poll.

Smoke test

With the worker running, INSERT a test job:

INSERT INTO imagen.jobs (owner_user_id, prompt, backend, width, height)
VALUES (
  'ac6c9501-3757-4a6d-8b97-2cff4288382b',
  'a tiny owl wearing wire-rim glasses, photo',
  'flux-schnell-local', 1024, 1024
);

Within ~10 seconds the row should show status='done', a populated image_id linking to a real imagen.images row, and a Storage object at <YYYY-MM-DD>/<slug>-<seed>.png in the imagen-generated bucket.