Skip to content

Introducing the Workflow Engine: Multi-Step Orchestration Without Temporal or Inngest

Job queues handle individual tasks well, but real-world processes span multiple steps: charge payment, reserve inventory, send confirmation, notify shipping. When step three fails, you need to undo steps one and two. That is orchestration — and until now, it required a dedicated service like Temporal, Inngest, or Trigger.dev.

bunqueue 2.7 changes that. The new Workflow Engine lets you define multi-step processes with a fluent TypeScript DSL, run saga compensation on failure, branch conditionally, and pause for human approval — all inside the same queue infrastructure you already use. Zero new services. Zero cloud dependencies.

Most teams reach a point where individual jobs are not enough. An order pipeline, a CI/CD deployment, an onboarding flow — these are sequences of jobs with dependencies, rollback logic, and sometimes human decisions in the middle.

The existing options all introduce significant complexity:

  • Temporal needs PostgreSQL, Elasticsearch, and 7+ services running
  • Inngest is cloud-only with vendor lock-in
  • Trigger.dev requires Redis and PostgreSQL alongside your app

bunqueue’s approach: if you already have a job queue with SQLite persistence, the execution engine is a natural extension. No new databases, no new protocols, no new deployment targets.

Define a workflow with chained steps:

import { Workflow, Engine } from 'bunqueue/workflow';
const order = new Workflow('order-pipeline')
.step('validate', async (ctx) => {
const { orderId } = ctx.input as { orderId: string };
// Validate order exists and is payable
return { orderId, validated: true };
})
.step('charge', async (ctx) => {
// Charge the customer
return { transactionId: 'txn_abc123' };
}, {
compensate: async (ctx) => {
// Refund if a later step fails
const { transactionId } = ctx.steps.charge as { transactionId: string };
await refundPayment(transactionId);
},
})
.waitFor('manager-approval')
.step('ship', async (ctx) => {
return { tracking: 'TRACK-789' };
});

Start the engine and run it:

const engine = new Engine({ embedded: true });
engine.register(order);
await engine.start();
const run = await engine.run('order-pipeline', { orderId: 'ORD-1' });

That is the entire setup. No YAML, no JSON state machines, no separate orchestrator process.

The saga pattern is first-class. Each step can declare a compensate handler. When any step fails, bunqueue runs compensation handlers in reverse order — exactly like a distributed transaction rollback:

step1 ✅ → step2 ✅ → step3 ❌
compensate(step2) → compensate(step1)

Compensation handlers receive the same context with all previous step results, so they have everything needed to undo their work.

Not every workflow is linear. Use .branch() and .path() to route execution based on runtime data:

const kyc = new Workflow('kyc-onboarding')
.step('score', async (ctx) => {
return { riskLevel: evaluateRisk(ctx.input) };
})
.branch((ctx) => {
const score = ctx.steps.score as { riskLevel: string };
return score.riskLevel; // 'low', 'medium', or 'high'
})
.path('low', (w) => w.step('auto-approve', async () => ({ approved: true })))
.path('medium', (w) => w.step('manual-review', async () => ({ reviewRequired: true })))
.path('high', (w) => w.step('escalate', async () => ({ escalated: true })))
.step('finalize', async (ctx) => {
return { completed: true };
});

Some workflows need a human decision before continuing. .waitFor() pauses execution until an external signal arrives:

const deploy = new Workflow('deploy')
.step('build', async (ctx) => { /* ... */ })
.step('test', async (ctx) => { /* ... */ })
.waitFor('qa-approval')
.step('deploy-prod', async (ctx) => { /* ... */ });
// Later, when QA approves:
await engine.signal(runId, 'qa-approval', { approvedBy: 'qa-lead' });

The workflow resumes exactly where it left off. Execution state is persisted in SQLite, so it survives process restarts.

Step Retry, Parallel Steps & Sub-Workflows

Section titled “Step Retry, Parallel Steps & Sub-Workflows”

Steps retry automatically with exponential backoff:

.step('call-api', async () => {
const res = await fetch('https://api.flaky.com/data');
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return await res.json();
}, { retry: 5, timeout: 10000 })

Run independent steps concurrently with .parallel():

.parallel((w) => w
.step('fetch-orders', async () => await db.orders.list())
.step('fetch-prefs', async () => await db.preferences.get())
.step('fetch-activity', async () => await analytics.recent())
)

Compose workflows by nesting them with .subWorkflow():

const parent = new Workflow('order')
.step('create', async () => ({ total: 99 }))
.subWorkflow('payment', (ctx) => ({
amount: (ctx.steps['create'] as { total: number }).total,
}))
.step('confirm', async (ctx) => {
const result = ctx.steps['sub:payment'];
return { done: true };
});

Subscribe to typed events for monitoring:

engine.on('step:failed', (e) => alerting.send(e));
engine.onAny((e) => metrics.increment(`workflow.${e.type}`));

Eleven event types cover the full lifecycle: workflow:started, step:completed, step:retry, signal:timeout, and more.

Iterate with doUntil / doWhile, process lists with forEach, and transform data with map:

const pipeline = new Workflow('etl')
.step('fetch', async () => ({ records: await db.getAll() }))
.forEach(
(ctx) => (ctx.steps['fetch'] as any).records,
'enrich',
async (ctx) => {
const record = ctx.steps.__item;
return await enrichment.process(record);
},
)
.map('aggregate', (ctx) => {
// Collect all forEach results
const results = [];
let i = 0;
while (ctx.steps[`enrich:${i}`]) { results.push(ctx.steps[`enrich:${i}`]); i++; }
return { total: results.length };
})
.doUntil(
(ctx) => (ctx.steps['sync'] as any)?.synced === true,
(w) => w.step('sync', async () => {
const ok = await externalApi.sync();
return { synced: ok };
}),
{ maxIterations: 10 }
);

Validate step inputs/outputs with any .parse() schema (Zod, ArkType, Valibot) and monitor specific executions:

import { z } from 'zod';
const flow = new Workflow('validated')
.step('process', async (ctx) => ({ id: 'usr_1', valid: true }), {
inputSchema: z.object({ email: z.string().email() }),
outputSchema: z.object({ id: z.string(), valid: z.boolean() }),
});
const run = await engine.start('validated', { email: 'user@test.com' });
const unsub = engine.subscribe(run.id, (e) => console.log(e.type));

Archive old executions to keep your SQLite lean:

engine.archive(30 * 24 * 60 * 60 * 1000); // Archive 30-day-old executions
engine.cleanup(7 * 24 * 60 * 60 * 1000); // Delete 7-day-old executions

The workflow engine is built on top of bunqueue’s existing Queue and Worker:

  1. Workflow is a pure data structure — a DAG of step definitions, branch conditions, and wait points
  2. Engine wraps a Queue and Worker pair, using them to schedule and process step jobs
  3. Execution state is stored in SQLite via WorkflowStore, tracking which steps completed, their results, and received signals
  4. Each step runs as a regular bunqueue job with retry, timeout, and stall detection

This means workflows get all of bunqueue’s existing reliability features for free: persistence, heartbeats, DLQ, and S3 backups.

The workflow engine is ideal for:

  • E-commerce pipelines — validate, charge, fulfill, notify with rollback on failure
  • CI/CD deployments — build, test, await approval, deploy
  • Onboarding flows — verify identity, score risk, branch by result
  • Data pipelines — extract, transform, load with per-step error handling
  • Approval workflows — any process that needs human sign-off mid-execution

For simple fire-and-forget jobs, retries, or cron schedules, the standard Queue + Worker API is still the right choice. The workflow engine adds value when you need step dependencies, rollback, or human decisions.

Terminal window
bun add bunqueue
import { Workflow, Engine } from 'bunqueue/workflow';

Read the full guide: Workflow Engine Documentation