Introducing the Workflow Engine: Multi-Step Orchestration Without Temporal or Inngest
Job queues handle individual tasks well, but real-world processes span multiple steps: charge payment, reserve inventory, send confirmation, notify shipping. When step three fails, you need to undo steps one and two. That is orchestration — and until now, it required a dedicated service like Temporal, Inngest, or Trigger.dev.
bunqueue 2.7 changes that. The new Workflow Engine lets you define multi-step processes with a fluent TypeScript DSL, run saga compensation on failure, branch conditionally, and pause for human approval — all inside the same queue infrastructure you already use. Zero new services. Zero cloud dependencies.
Why Build It In?
Section titled “Why Build It In?”Most teams reach a point where individual jobs are not enough. An order pipeline, a CI/CD deployment, an onboarding flow — these are sequences of jobs with dependencies, rollback logic, and sometimes human decisions in the middle.
The existing options all introduce significant complexity:
- Temporal needs PostgreSQL, Elasticsearch, and 7+ services running
- Inngest is cloud-only with vendor lock-in
- Trigger.dev requires Redis and PostgreSQL alongside your app
bunqueue’s approach: if you already have a job queue with SQLite persistence, the execution engine is a natural extension. No new databases, no new protocols, no new deployment targets.
The API
Section titled “The API”Define a workflow with chained steps:
import { Workflow, Engine } from 'bunqueue/workflow';
const order = new Workflow('order-pipeline') .step('validate', async (ctx) => { const { orderId } = ctx.input as { orderId: string }; // Validate order exists and is payable return { orderId, validated: true }; }) .step('charge', async (ctx) => { // Charge the customer return { transactionId: 'txn_abc123' }; }, { compensate: async (ctx) => { // Refund if a later step fails const { transactionId } = ctx.steps.charge as { transactionId: string }; await refundPayment(transactionId); }, }) .waitFor('manager-approval') .step('ship', async (ctx) => { return { tracking: 'TRACK-789' }; });Start the engine and run it:
const engine = new Engine({ embedded: true });engine.register(order);await engine.start();
const run = await engine.run('order-pipeline', { orderId: 'ORD-1' });That is the entire setup. No YAML, no JSON state machines, no separate orchestrator process.
Saga Compensation
Section titled “Saga Compensation”The saga pattern is first-class. Each step can declare a compensate handler. When any step fails, bunqueue runs compensation handlers in reverse order — exactly like a distributed transaction rollback:
step1 ✅ → step2 ✅ → step3 ❌ ↓ compensate(step2) → compensate(step1)Compensation handlers receive the same context with all previous step results, so they have everything needed to undo their work.
Conditional Branching
Section titled “Conditional Branching”Not every workflow is linear. Use .branch() and .path() to route execution based on runtime data:
const kyc = new Workflow('kyc-onboarding') .step('score', async (ctx) => { return { riskLevel: evaluateRisk(ctx.input) }; }) .branch((ctx) => { const score = ctx.steps.score as { riskLevel: string }; return score.riskLevel; // 'low', 'medium', or 'high' }) .path('low', (w) => w.step('auto-approve', async () => ({ approved: true }))) .path('medium', (w) => w.step('manual-review', async () => ({ reviewRequired: true }))) .path('high', (w) => w.step('escalate', async () => ({ escalated: true }))) .step('finalize', async (ctx) => { return { completed: true }; });Human-in-the-Loop
Section titled “Human-in-the-Loop”Some workflows need a human decision before continuing. .waitFor() pauses execution until an external signal arrives:
const deploy = new Workflow('deploy') .step('build', async (ctx) => { /* ... */ }) .step('test', async (ctx) => { /* ... */ }) .waitFor('qa-approval') .step('deploy-prod', async (ctx) => { /* ... */ });
// Later, when QA approves:await engine.signal(runId, 'qa-approval', { approvedBy: 'qa-lead' });The workflow resumes exactly where it left off. Execution state is persisted in SQLite, so it survives process restarts.
Step Retry, Parallel Steps & Sub-Workflows
Section titled “Step Retry, Parallel Steps & Sub-Workflows”Steps retry automatically with exponential backoff:
.step('call-api', async () => { const res = await fetch('https://api.flaky.com/data'); if (!res.ok) throw new Error(`HTTP ${res.status}`); return await res.json();}, { retry: 5, timeout: 10000 })Run independent steps concurrently with .parallel():
.parallel((w) => w .step('fetch-orders', async () => await db.orders.list()) .step('fetch-prefs', async () => await db.preferences.get()) .step('fetch-activity', async () => await analytics.recent()))Compose workflows by nesting them with .subWorkflow():
const parent = new Workflow('order') .step('create', async () => ({ total: 99 })) .subWorkflow('payment', (ctx) => ({ amount: (ctx.steps['create'] as { total: number }).total, })) .step('confirm', async (ctx) => { const result = ctx.steps['sub:payment']; return { done: true }; });Observability & Lifecycle Events
Section titled “Observability & Lifecycle Events”Subscribe to typed events for monitoring:
engine.on('step:failed', (e) => alerting.send(e));engine.onAny((e) => metrics.increment(`workflow.${e.type}`));Eleven event types cover the full lifecycle: workflow:started, step:completed, step:retry, signal:timeout, and more.
Loops, forEach & Map
Section titled “Loops, forEach & Map”Iterate with doUntil / doWhile, process lists with forEach, and transform data with map:
const pipeline = new Workflow('etl') .step('fetch', async () => ({ records: await db.getAll() })) .forEach( (ctx) => (ctx.steps['fetch'] as any).records, 'enrich', async (ctx) => { const record = ctx.steps.__item; return await enrichment.process(record); }, ) .map('aggregate', (ctx) => { // Collect all forEach results const results = []; let i = 0; while (ctx.steps[`enrich:${i}`]) { results.push(ctx.steps[`enrich:${i}`]); i++; } return { total: results.length }; }) .doUntil( (ctx) => (ctx.steps['sync'] as any)?.synced === true, (w) => w.step('sync', async () => { const ok = await externalApi.sync(); return { synced: ok }; }), { maxIterations: 10 } );Schema Validation & Subscribe
Section titled “Schema Validation & Subscribe”Validate step inputs/outputs with any .parse() schema (Zod, ArkType, Valibot) and monitor specific executions:
import { z } from 'zod';
const flow = new Workflow('validated') .step('process', async (ctx) => ({ id: 'usr_1', valid: true }), { inputSchema: z.object({ email: z.string().email() }), outputSchema: z.object({ id: z.string(), valid: z.boolean() }), });
const run = await engine.start('validated', { email: 'user@test.com' });const unsub = engine.subscribe(run.id, (e) => console.log(e.type));Cleanup & Archival
Section titled “Cleanup & Archival”Archive old executions to keep your SQLite lean:
engine.archive(30 * 24 * 60 * 60 * 1000); // Archive 30-day-old executionsengine.cleanup(7 * 24 * 60 * 60 * 1000); // Delete 7-day-old executionsHow It Works Internally
Section titled “How It Works Internally”The workflow engine is built on top of bunqueue’s existing Queue and Worker:
- Workflow is a pure data structure — a DAG of step definitions, branch conditions, and wait points
- Engine wraps a Queue and Worker pair, using them to schedule and process step jobs
- Execution state is stored in SQLite via
WorkflowStore, tracking which steps completed, their results, and received signals - Each step runs as a regular bunqueue job with retry, timeout, and stall detection
This means workflows get all of bunqueue’s existing reliability features for free: persistence, heartbeats, DLQ, and S3 backups.
When to Use It
Section titled “When to Use It”The workflow engine is ideal for:
- E-commerce pipelines — validate, charge, fulfill, notify with rollback on failure
- CI/CD deployments — build, test, await approval, deploy
- Onboarding flows — verify identity, score risk, branch by result
- Data pipelines — extract, transform, load with per-step error handling
- Approval workflows — any process that needs human sign-off mid-execution
For simple fire-and-forget jobs, retries, or cron schedules, the standard Queue + Worker API is still the right choice. The workflow engine adds value when you need step dependencies, rollback, or human decisions.
Get Started
Section titled “Get Started”bun add bunqueueimport { Workflow, Engine } from 'bunqueue/workflow';Read the full guide: Workflow Engine Documentation