Workflow Engine — Multi-Step Orchestration for Bun
Orchestrate multi-step business processes with a fluent, chainable DSL. Saga compensation, step retry with exponential backoff, parallel execution, conditional branching, nested sub-workflows, human-in-the-loop signals with timeout, loop control flow (doUntil/doWhile), forEach iteration, map transforms, schema validation (Zod-compatible), per-execution subscribe, typed observability events, and cleanup/archival — all built on top of bunqueue’s Queue and Worker. No new infrastructure, no external services, no YAML.
validate ──→ reserve stock ──→ charge payment ──→ send confirmation ↑ ↑ compensate: compensate: release stock refund paymentbunqueue vs Competitors
Section titled “bunqueue vs Competitors”| bunqueue | Temporal | Inngest | Trigger.dev | |
|---|---|---|---|---|
| Definition | TypeScript DSL | TypeScript + decorators | step.run() wrappers | TypeScript functions |
| Infrastructure | None (embedded SQLite) | PostgreSQL + 7 services | Cloud-only (no self-host) | Redis + PostgreSQL |
| Saga compensation | Built-in | Manual | Manual | Manual |
| Human-in-the-loop | .waitFor() + signal() | Signals API | step.waitForEvent() | Waitpoint tokens |
| Branching | .branch().path() | Code-level if/else | Code-level if/else | Code-level if/else |
| Parallel steps | .parallel() | Promise.all | step.run() in parallel | Manual |
| Step retry | Built-in (exponential backoff) | Built-in | Built-in | Built-in |
| Signal timeout | .waitFor(event, { timeout }) | Workflow.await | step.waitForEvent timeout | Manual |
| Nested workflows | .subWorkflow() | Child workflows | step.invoke() | Manual |
| Observability | Typed event emitter | Temporal UI | Inngest dashboard | Dashboard |
| Loops (doUntil/doWhile) | .doUntil() / .doWhile() | Code-level loops | Manual | Manual |
| forEach | .forEach() with indexed results | Code-level loops | Manual | Manual |
| Map transform | .map() | Code-level | Manual | Manual |
| Schema validation | Duck-typed .parse() (Zod, ArkType) | Manual | Built-in | Manual |
| Per-execution subscribe | engine.subscribe(id, cb) | Manual | Webhook | Manual |
| Cleanup/archival | Built-in SQLite archive | Manual | Auto (cloud) | Manual |
| Self-hosted | Yes (zero-config) | Yes (complex) | No | Yes (complex) |
| Pricing | Free (MIT) | Free self-hosted / Cloud $$ | Free tier, then per-execution | Free tier, then $50/mo+ |
| Setup time | bun add bunqueue | Hours to days | Minutes (cloud) | 30min+ self-hosted |
Why bunqueue?
Section titled “Why bunqueue?”- Zero infrastructure. Temporal needs PostgreSQL + 7 services. Trigger.dev needs Redis + PostgreSQL. bunqueue needs nothing — SQLite is embedded.
- Saga pattern is first-class. Every competitor requires you to implement compensation manually. bunqueue runs compensate handlers in reverse order automatically.
- TypeScript-native DSL. No decorators (Temporal), no wrapper functions (Inngest). Just
.step().step().branch().step(). - Same process, same codebase. No separate worker infrastructure, no deployment pipeline for workflow definitions. It’s a library, not a platform.
When to use something else
Section titled “When to use something else”- Multi-region HA with automatic failover — Use Temporal
- Serverless-first with zero ops — Use Inngest
- Already running Redis with BullMQ — Use BullMQ FlowProducer for simple parent-child chains
Quick Start
Section titled “Quick Start”bun add bunqueueimport { Workflow, Engine } from 'bunqueue/workflow';
// Define a workflowconst orderFlow = new Workflow('order-pipeline') .step('validate', async (ctx) => { const { orderId, amount } = ctx.input as { orderId: string; amount: number }; if (amount <= 0) throw new Error('Invalid amount'); return { orderId, validated: true }; }) .step('charge', async (ctx) => { const { orderId } = ctx.steps['validate'] as { orderId: string }; const { amount } = ctx.input as { amount: number }; const txId = await payments.charge(orderId, amount); return { transactionId: txId }; }, { compensate: async () => { // Runs automatically if a later step fails await payments.refund(); }, }) .step('confirm', async (ctx) => { const { transactionId } = ctx.steps['charge'] as { transactionId: string }; await mailer.send('order-confirm', { txId: transactionId }); return { emailSent: true, transactionId }; });
// Create engine and runconst engine = new Engine({ embedded: true });engine.register(orderFlow);
const run = await engine.start('order-pipeline', { orderId: 'ORD-1', amount: 99.99,});
// Check statusconst exec = engine.getExecution(run.id);console.log(exec.state); // 'running' | 'completed' | 'failed' | 'waiting' | 'compensating'Core Concepts
Section titled “Core Concepts”Steps are the building blocks. Each step receives a context with the workflow input and all previous step results:
const flow = new Workflow('data-pipeline') .step('extract', async (ctx) => { const { source } = ctx.input as { source: string }; const rawData = await fetchFromSource(source); return { records: rawData.length, data: rawData }; }) .step('transform', async (ctx) => { // Access previous step results via ctx.steps const { data } = ctx.steps['extract'] as { data: RawRecord[] }; const cleaned = data.filter(r => r.valid).map(normalize); return { cleaned, dropped: data.length - cleaned.length }; }) .step('load', async (ctx) => { const { cleaned } = ctx.steps['transform'] as { cleaned: CleanRecord[] }; await db.insertBatch('analytics', cleaned); // Access original input too return { loaded: cleaned.length, source: (ctx.input as { source: string }).source }; });StepContext shape:
| Property | Type | Description |
|---|---|---|
ctx.input | unknown | The input passed to engine.start() |
ctx.steps | Record<string, unknown> | Results from all completed steps (keyed by step name) |
ctx.signals | Record<string, unknown> | Data from received signals (keyed by event name) |
ctx.executionId | string | Unique execution ID |
Every step must return a value (or undefined). The return value becomes available to subsequent steps via ctx.steps['step-name'].
Compensation (Saga Pattern)
Section titled “Compensation (Saga Pattern)”When a step fails, compensation handlers run in reverse order for all previously completed steps. This implements the saga pattern — the industry-standard approach for distributed transactions without two-phase commit.
const flow = new Workflow('money-transfer') .step('debit-source', async (ctx) => { const { from, amount } = ctx.input as { from: string; to: string; amount: number }; await accounts.debit(from, amount); return { debited: true, account: from, amount }; }, { compensate: async (ctx) => { // Undo: credit back the source account const { from, amount } = ctx.input as { from: string; to: string; amount: number }; await accounts.credit(from, amount); console.log('Rolled back: source account credited'); }, }) .step('credit-target', async (ctx) => { const { to, amount } = ctx.input as { from: string; to: string; amount: number }; await accounts.credit(to, amount); return { credited: true, account: to, amount }; }, { compensate: async (ctx) => { // Undo: debit back the target account const { to, amount } = ctx.input as { from: string; to: string; amount: number }; await accounts.debit(to, amount); console.log('Rolled back: target account debited'); }, }) .step('send-receipt', async () => { throw new Error('Email service down'); // → Compensation runs automatically in reverse: // 1. credit-target compensate (debit target) // 2. debit-source compensate (credit source) });How it works:
- Steps A, B, C execute in order
- Step C throws an error
- Engine runs compensation for B, then A (reverse order)
- Execution state becomes
'failed'
Compensation is best-effort — if a compensate handler itself throws, the error is logged but the remaining compensations still run.
Branching
Section titled “Branching”Route execution to different paths based on runtime conditions:
const flow = new Workflow('support-ticket') .step('classify', async (ctx) => { const { message, plan } = ctx.input as { message: string; plan: string }; const sentiment = await analyzeSentiment(message); const priority = plan === 'enterprise' ? 'high' : sentiment < 0 ? 'medium' : 'low'; return { priority }; }) .branch((ctx) => (ctx.steps['classify'] as { priority: string }).priority) .path('high', (w) => w.step('assign-senior', async (ctx) => { const agent = await roster.getAvailable('senior'); await slack.notify(agent, 'Urgent ticket assigned'); return { assignedTo: agent.name, sla: '1h' }; }) ) .path('medium', (w) => w.step('assign-regular', async (ctx) => { const agent = await roster.getAvailable('regular'); return { assignedTo: agent.name, sla: '4h' }; }) ) .path('low', (w) => w.step('auto-reply', async (ctx) => { await mailer.sendTemplate('auto-reply', ctx.input); return { assignedTo: 'bot', sla: '24h' }; }) ) .step('log-ticket', async (ctx) => { // This step always runs, regardless of which branch was taken await auditLog.write('ticket-created', { executionId: ctx.executionId }); return { logged: true }; });Rules:
- The branch function returns a string that matches one of the
.path()names - Only the matching path executes; others are skipped entirely
- Steps after the branch block always run (convergence point)
- Each path can contain multiple steps, nested branches, or
waitForcalls
WaitFor (Human-in-the-Loop)
Section titled “WaitFor (Human-in-the-Loop)”Pause execution until an external signal arrives. This is how you implement approval gates, manual review steps, or any process that needs human input:
const flow = new Workflow('content-publishing') .step('draft', async (ctx) => { const { title, body } = ctx.input as { title: string; body: string }; const draft = await cms.createDraft({ title, body }); await slack.notify('#editorial', `New draft "${title}" ready for review`); return { draftId: draft.id, previewUrl: draft.previewUrl }; }) .waitFor('editorial-review') .step('publish-or-reject', async (ctx) => { const review = ctx.signals['editorial-review'] as { approved: boolean; editor: string; notes?: string; };
const { draftId } = ctx.steps['draft'] as { draftId: string };
if (!review.approved) { await cms.reject(draftId, review.notes); return { status: 'rejected', editor: review.editor }; }
const published = await cms.publish(draftId); return { status: 'published', url: published.url, editor: review.editor }; });
// Start the workflowconst run = await engine.start('content-publishing', { title: 'Announcing Workflow Engine', body: '...',});
// The execution pauses at 'editorial-review' with state: 'waiting'// Your app can show a UI, send a Slack button, expose an API endpoint, etc.
// When the editor makes a decision (could be minutes, hours, or days later):await engine.signal(run.id, 'editorial-review', { approved: true, editor: 'alice@company.com', notes: 'Great article, ship it!',});// → Execution resumes from 'publish-or-reject'Key behaviors:
waitFor('event')transitions the execution tostate: 'waiting'- The execution is persisted to SQLite — it survives process restarts
engine.signal(id, event, payload)stores the payload and resumes execution- The signal data is available in
ctx.signals['event-name'] - You can have multiple
waitForcalls in a single workflow (e.g., multi-stage approvals)
Step Timeout
Section titled “Step Timeout”Prevent steps from running indefinitely:
const flow = new Workflow('api-aggregation') .step('fetch-primary', async () => { const res = await fetch('https://api.primary.com/data'); return await res.json(); }, { timeout: 5000 }) // 5 second timeout .step('fetch-secondary', async () => { const res = await fetch('https://api.secondary.com/data'); return await res.json(); }, { timeout: 10000 }) // 10 second timeout .step('merge', async (ctx) => { const primary = ctx.steps['fetch-primary']; const secondary = ctx.steps['fetch-secondary']; return { ...primary, ...secondary }; });If a step exceeds its timeout, it fails with a "timed out" error. If the step has a compensate handler, compensation runs for all previously completed steps.
Step Retry with Backoff
Section titled “Step Retry with Backoff”Steps can retry automatically with exponential backoff and jitter:
const flow = new Workflow('resilient-pipeline') .step('call-api', async () => { const res = await fetch('https://api.external.com/data'); if (!res.ok) throw new Error(`HTTP ${res.status}`); return await res.json(); }, { retry: 5, // Max 5 attempts (default: 3) timeout: 10000, // 10s per attempt }) .step('process', async (ctx) => { const data = ctx.steps['call-api'] as ApiResponse; return { processed: true }; });Backoff formula: min(500ms * 2^attempt + jitter, 30s). The first retry waits ~500ms, the second ~1s, the third ~2s, capping at 30 seconds. Jitter prevents thundering herds.
The execution tracks attempt count in exec.steps['step-name'].attempts. If all retries are exhausted, the step fails and compensation runs.
Parallel Steps
Section titled “Parallel Steps”Run multiple steps concurrently with .parallel():
const flow = new Workflow('data-enrichment') .step('fetch-user', async (ctx) => { const { userId } = ctx.input as { userId: string }; return await db.users.find(userId); }) .parallel((w) => w .step('fetch-orders', async (ctx) => { const { userId } = ctx.input as { userId: string }; return await db.orders.findByUser(userId); }) .step('fetch-preferences', async (ctx) => { const { userId } = ctx.input as { userId: string }; return await db.preferences.get(userId); }) .step('fetch-activity', async (ctx) => { const { userId } = ctx.input as { userId: string }; return await analytics.getRecent(userId); }) ) .step('merge', async (ctx) => { // All parallel step results are available const orders = ctx.steps['fetch-orders']; const prefs = ctx.steps['fetch-preferences']; const activity = ctx.steps['fetch-activity']; return { profile: { orders, prefs, activity } }; });How it works:
- All steps inside
.parallel()run viaPromise.allSettled - Results from each parallel step are saved to
exec.stepslike normal steps - If any parallel step fails, the entire parallel group fails and compensation runs
- Steps after the parallel block wait for all parallel steps to finish
Signal Timeout
Section titled “Signal Timeout”Add a timeout to waitFor so workflows don’t hang indefinitely:
const flow = new Workflow('time-limited-approval') .step('submit', async (ctx) => { const { amount } = ctx.input as { amount: number }; await slack.notify('#approvals', `Expense $${amount} needs review`); return { submitted: true }; }) .waitFor('manager-approval', { timeout: 86400000 }) // 24 hours .step('process', async (ctx) => { const decision = ctx.signals['manager-approval'] as { approved: boolean }; return { status: decision.approved ? 'paid' : 'rejected' }; });If the signal doesn’t arrive within the timeout:
- The execution state becomes
'failed' - The error is stored in
exec.steps['__waitFor:manager-approval'].error - Compensation runs for all previously completed steps
- A
signal:timeoutevent is emitted
Nested Workflows (Sub-Workflows)
Section titled “Nested Workflows (Sub-Workflows)”Compose workflows by calling child workflows from a parent:
const paymentFlow = new Workflow('payment') .step('validate-card', async (ctx) => { const { cardToken, amount } = ctx.input as { cardToken: string; amount: number }; return { valid: true, amount }; }) .step('charge', async (ctx) => { return { txId: `tx_${Date.now()}` }; }, { compensate: async () => { await payments.refund(); }, });
const orderFlow = new Workflow('order') .step('create-order', async (ctx) => { const { amount } = ctx.input as { amount: number; cardToken: string }; return { orderId: `ORD-${Date.now()}`, total: amount }; }) .subWorkflow('payment', (ctx) => ({ // Map parent context to child input cardToken: (ctx.input as { cardToken: string }).cardToken, amount: (ctx.steps['create-order'] as { total: number }).total, })) .step('confirm', async (ctx) => { // Child results available under 'sub:<workflow-name>' const paymentResult = ctx.steps['sub:payment'] as Record<string, unknown>; return { confirmed: true, payment: paymentResult }; });
const engine = new Engine({ embedded: true });engine.register(paymentFlow); // Register child firstengine.register(orderFlow);
await engine.start('order', { amount: 99, cardToken: 'tok_abc' });Key behaviors:
- The parent workflow pauses while the child executes
- Child workflow results are stored under
ctx.steps['sub:<child-name>'] - If the child fails, the parent fails too (and parent compensation runs)
- The input mapper function receives the parent’s context, allowing you to pass any data from parent steps to the child
Observability (Events)
Section titled “Observability (Events)”Subscribe to typed workflow events for monitoring, logging, and debugging:
const engine = new Engine({ embedded: true });
// Listen to specific event typesengine.on('workflow:started', (event) => { console.log(`Workflow ${event.workflowName} started: ${event.executionId}`);});
engine.on('step:completed', (event) => { const { stepName } = event as StepEvent; console.log(`Step ${stepName} completed in ${event.executionId}`);});
engine.on('workflow:failed', (event) => { alerting.send(`Workflow ${event.workflowName} failed: ${event.executionId}`);});
// Listen to ALL eventsengine.onAny((event) => { metrics.increment(`workflow.${event.type}`, { workflow: event.workflowName, });});
// Or pass onEvent in constructorconst engine2 = new Engine({ embedded: true, onEvent: (event) => logger.info(event),});Available event types:
| Event | When |
|---|---|
workflow:started | engine.start() is called |
workflow:completed | All steps finished successfully |
workflow:failed | A step threw after retries exhausted |
workflow:waiting | Execution paused at a waitFor |
workflow:compensating | Compensation is running |
step:started | A step begins executing |
step:completed | A step finished successfully |
step:failed | A step threw an error |
step:retry | A step is about to retry after failure |
signal:received | engine.signal() delivered a signal |
signal:timeout | A waitFor timed out |
Use engine.off(type, listener) and engine.offAny(listener) to unsubscribe.
Cleanup & Archival
Section titled “Cleanup & Archival”Manage execution history with built-in cleanup and archival:
const engine = new Engine({ embedded: true });
// Delete old completed/failed executions (older than 7 days)const deleted = engine.cleanup(7 * 24 * 60 * 60 * 1000);console.log(`Deleted ${deleted} executions`);
// Or selectively clean only completed executionsengine.cleanup(7 * 24 * 60 * 60 * 1000, ['completed']);
// Archive instead of delete (moves to archive table)const archived = engine.archive(30 * 24 * 60 * 60 * 1000); // 30 daysconsole.log(`Archived ${archived} executions`);
// Check archive countconsole.log(`Total archived: ${engine.getArchivedCount()}`);Cleanup vs Archive:
cleanup(maxAgeMs, states?)— Permanently deletes executions older thanmaxAgeMsarchive(maxAgeMs, states?)— Moves executions to a separateworkflow_executions_archivetable (transactional, up to 1000 per call)- Both accept an optional
statesfilter:['completed'],['failed'],['completed', 'failed'], etc.
Loops (doUntil / doWhile)
Section titled “Loops (doUntil / doWhile)”Repeat a set of steps based on a condition. Two flavors:
doUntil(condition, builder, options?)— Runs steps first, then checks condition. Repeats until condition returnstrue(do…until semantics).doWhile(condition, builder, options?)— Checks condition first, then runs steps. Repeats while condition returnstrue(while…do semantics).
// doUntil: retry sending until delivery confirmedconst flow = new Workflow('delivery') .doUntil( (ctx) => (ctx.steps['send'] as { delivered: boolean })?.delivered === true, (w) => w.step('send', async (ctx) => { const result = await deliveryService.attempt(ctx.input); return { delivered: result.success }; }), { maxIterations: 10 } // safety limit (default: 100) );
// doWhile: process items while queue has itemsconst batchFlow = new Workflow('batch') .doWhile( (ctx) => { const remaining = (ctx.steps['process'] as { remaining: number })?.remaining ?? 10; return remaining > 0; }, (w) => w.step('process', async (ctx) => { const batch = await fetchNextBatch(); await processBatch(batch); return { remaining: await getQueueSize() }; }), );Key behaviors:
doWhilecan skip entirely if the condition isfalseon the first checkdoUntilalways runs at least oncemaxIterationsprevents infinite loops (default: 100)- Loop step results are overwritten each iteration — only the last iteration’s result is available downstream
- Conditions can be async (return a
Promise<boolean>)
forEach
Section titled “forEach”Iterate over a dynamic list of items, executing a step for each:
const flow = new Workflow<{ userIds: string[] }>('notify-all') .forEach( (ctx) => (ctx.input as { userIds: string[] }).userIds, // items extractor 'notify', // step name async (ctx) => { const userId = ctx.steps.__item as string; // current item const index = ctx.steps.__index as number; // current index (0-based) await sendNotification(userId); return { notified: userId }; }, { retry: 3, timeout: 10_000 } // standard step options );Key behaviors:
- Results are stored with indexed names:
notify:0,notify:1,notify:2, etc. - Each iteration receives
__item(current item) and__index(current index) viactx.steps - Items are processed sequentially (not in parallel)
maxIterationsoption limits array size (default: 1000)- Standard step options (
retry,timeout,compensate,inputSchema,outputSchema) apply to each iteration
Transform step results into a new value without executing an async handler. A synchronous, pure data-transform node:
const flow = new Workflow('etl') .step('fetch', async () => { const records = await db.query('SELECT * FROM events'); return { records }; }) .map('aggregate', (ctx) => { const { records } = ctx.steps['fetch'] as { records: Event[] }; return { total: records.length, byType: Object.groupBy(records, r => r.type), }; }) .step('store', async (ctx) => { const agg = ctx.steps['aggregate'] as AggregatedData; await analytics.insert(agg); return { stored: true }; });Key behaviors:
- Runs synchronously (no retry, no timeout) — it’s a pure transform
- Result is stored under the map name (e.g.,
ctx.steps['aggregate']) - The transform function receives the full
StepContext(input, steps, signals)
Schema Validation
Section titled “Schema Validation”Validate step inputs and outputs with any schema library that has a .parse() method (Zod, ArkType, Valibot, etc.):
import { z } from 'zod';
const OrderInput = z.object({ orderId: z.string(), amount: z.number().positive(),});
const ChargeResult = z.object({ transactionId: z.string(), charged: z.number(),});
const flow = new Workflow('validated-order') .step('validate', async (ctx) => { const { orderId } = ctx.input as { orderId: string; amount: number }; return { orderId, validated: true }; }, { inputSchema: OrderInput, // validates ctx.input before handler runs }) .step('charge', async (ctx) => { return { transactionId: 'tx_123', charged: 99.99 }; }, { outputSchema: ChargeResult, // validates return value after handler runs });Key behaviors:
inputSchemavalidatesctx.inputbefore the step handler executesoutputSchemavalidates the handler’s return value after execution- Uses duck typing: any object with a
.parse(data)method works — no runtime dependency on Zod - Validation failure throws an error (triggers retry or compensation like any other step failure)
- Works with Zod, ArkType, Valibot, or any custom schema object
Subscribe
Section titled “Subscribe”Monitor a specific execution’s events in real-time:
const run = await engine.start('order-pipeline', { orderId: 'ORD-1' });
// Subscribe to all events for this executionconst unsubscribe = engine.subscribe(run.id, (event) => { console.log(`[${event.type}]`, event);
if (event.type === 'workflow:completed') { console.log('Order pipeline finished!'); }});
// Later: stop listeningunsubscribe();Key behaviors:
- Returns an
unsubscribefunction — call it to stop receiving events - Only receives events for the specified execution ID (filters automatically)
- Receives all event types:
step:started,step:completed,step:failed,step:retry,workflow:*,signal:* - Complements
engine.on()/engine.onAny()which are global (all executions)
Engine API
Section titled “Engine API”Constructor
Section titled “Constructor”// Embedded mode — everything in-process, no server neededconst engine = new Engine({ embedded: true });
// Embedded with SQLite persistenceconst engine = new Engine({ embedded: true, dataPath: './data/workflows.db',});
// TCP mode — connects to a running bunqueue serverconst engine = new Engine({ connection: { host: 'localhost', port: 6789 },});
// All optionsconst engine = new Engine({ embedded: true, // Use embedded mode (default: false) connection: { port: 6789 }, // TCP server connection (mutually exclusive with embedded) dataPath: './data/wf.db', // SQLite persistence path concurrency: 10, // Max parallel step executions (default: 5) queueName: '__wf:steps', // Internal queue name (default: '__wf:steps') onEvent: (event) => {}, // Global event listener (optional)});Methods
Section titled “Methods”| Method | Returns | Description |
|---|---|---|
engine.register(workflow) | this | Register a workflow definition. Chainable. |
engine.start(name, input?) | Promise<RunHandle> | Start a new execution. Returns { id, workflowName }. |
engine.getExecution(id) | Execution | null | Get full execution state by ID. |
engine.listExecutions(name?, state?) | Execution[] | List executions with optional filters. |
engine.signal(id, event, payload?) | Promise<void> | Send a signal to resume a waiting execution. |
engine.on(type, listener) | void | Subscribe to a specific event type. |
engine.onAny(listener) | void | Subscribe to all events. |
engine.off(type, listener) | void | Unsubscribe from a specific event type. |
engine.offAny(listener) | void | Unsubscribe from all events. |
engine.subscribe(id, callback) | () => void | Subscribe to events for a specific execution. Returns unsubscribe function. |
engine.cleanup(maxAgeMs, states?) | number | Delete executions older than maxAgeMs. Returns count. |
engine.archive(maxAgeMs, states?) | number | Move old executions to archive table. Returns count. |
engine.getArchivedCount() | number | Count of archived executions. |
engine.close(force?) | Promise<void> | Shut down engine, queue, and worker. |
Execution State
Section titled “Execution State”const exec = engine.getExecution(run.id);
exec.id; // 'wf_abc123' — unique execution IDexec.workflowName; // 'order-pipeline'exec.state; // 'running' | 'completed' | 'failed' | 'waiting' | 'compensating'exec.input; // { orderId: 'ORD-1', amount: 99.99 }exec.steps; // Step-by-step status and results:// {// 'validate': { status: 'completed', result: { orderId: 'ORD-1', validated: true } },// 'charge': { status: 'completed', result: { transactionId: 'tx_abc' } },// 'confirm': { status: 'running' }// }exec.signals; // { 'manager-approval': { approved: true } }exec.createdAt; // 1712700000000exec.updatedAt; // 1712700005000Execution states:
| State | Meaning |
|---|---|
running | Steps are being executed |
completed | All steps finished successfully |
failed | A step threw an error (compensation has run) |
waiting | Paused at a waitFor, waiting for a signal |
compensating | Compensation handlers are running |
Real-World Examples
Section titled “Real-World Examples”E-commerce Order Pipeline
Section titled “E-commerce Order Pipeline”A complete order flow with inventory reservation, payment processing, and automatic rollback on failure:
const orderFlow = new Workflow<{ orderId: string; items: Item[]; amount: number }>('process-order') .step('validate-order', async (ctx) => { const { orderId, items, amount } = ctx.input as OrderInput;
if (items.length === 0) throw new Error('Empty cart'); if (amount <= 0) throw new Error('Invalid amount');
// Check all items are in catalog for (const item of items) { const exists = await catalog.exists(item.sku); if (!exists) throw new Error(`Unknown SKU: ${item.sku}`); }
return { orderId, itemCount: items.length, amount }; }) .step('reserve-inventory', async (ctx) => { const { items } = ctx.input as OrderInput; const reservationId = await inventory.reserveBatch(items); return { reservationId }; }, { retry: 3, // Retry on transient inventory service errors compensate: async (ctx) => { const { reservationId } = ctx.steps['reserve-inventory'] as { reservationId: string }; await inventory.releaseBatch(reservationId); }, }) .step('process-payment', async (ctx) => { const { amount, orderId } = ctx.steps['validate-order'] as ValidatedOrder; const charge = await stripe.charges.create({ amount: Math.round(amount * 100), currency: 'usd', metadata: { orderId }, }); return { chargeId: charge.id, receiptUrl: charge.receipt_url }; }, { retry: 5, // Payment gateway can be flaky timeout: 15000, // 15s timeout per attempt compensate: async (ctx) => { const { chargeId } = ctx.steps['process-payment'] as { chargeId: string }; await stripe.refunds.create({ charge: chargeId }); }, }) .step('create-shipment', async (ctx) => { const { orderId, items } = ctx.input as OrderInput; const { reservationId } = ctx.steps['reserve-inventory'] as { reservationId: string }; const shipment = await shipping.create({ orderId, items, reservationId }); return { trackingNumber: shipment.tracking, carrier: shipment.carrier }; }) .parallel((w) => w .step('send-confirmation', async (ctx) => { const payment = ctx.steps['process-payment'] as { chargeId: string; receiptUrl: string }; const shipment = ctx.steps['create-shipment'] as { trackingNumber: string; carrier: string }; const { email } = ctx.input as { email: string }; await mailer.send('order-confirmation', { to: email, receiptUrl: payment.receiptUrl, tracking: shipment.trackingNumber, }); return { emailSent: true }; }) .step('notify-warehouse', async (ctx) => { const { reservationId } = ctx.steps['reserve-inventory'] as { reservationId: string }; await warehouse.notifyShipment(reservationId); return { warehouseNotified: true }; }) .step('update-analytics', async (ctx) => { const { amount, orderId } = ctx.steps['validate-order'] as ValidatedOrder; await analytics.trackPurchase({ orderId, amount }); return { tracked: true }; }) );What happens on failure:
- If
process-paymentfails after 5 retries →reserve-inventorycompensation runs (items released) - If
create-shipmentfails →process-paymentcompensation runs (refund), thenreserve-inventorycompensation runs (items released) - If any parallel notification step fails → full rollback: refund payment, release inventory
- The
parallel()block sends email, notifies warehouse, and tracks analytics concurrently — much faster than sequential
CI/CD Deployment Pipeline with Approval Gate
Section titled “CI/CD Deployment Pipeline with Approval Gate”Build, test, deploy to staging, wait for manual approval, then deploy to production:
const deployFlow = new Workflow('deploy-pipeline') .step('build', async (ctx) => { const { repo, branch, commitSha } = ctx.input as DeployInput; const build = await ci.triggerBuild({ repo, branch, commitSha }); await ci.waitForBuild(build.id); // Polls until complete return { buildId: build.id, artifact: build.artifactUrl, duration: build.durationMs }; }) .step('run-tests', async (ctx) => { const { buildId } = ctx.steps['build'] as { buildId: string }; const results = await ci.runTestSuite(buildId, { suites: ['unit', 'integration', 'e2e'], parallel: true, });
if (results.failed > 0) { throw new Error(`${results.failed}/${results.total} tests failed`); }
return { passed: results.passed, coverage: results.coverage }; }) .step('deploy-staging', async (ctx) => { const { artifact } = ctx.steps['build'] as { artifact: string }; await k8s.deploy('staging', artifact); const healthCheck = await k8s.waitForHealthy('staging', 60000);
// Notify the team await slack.send('#deploys', { text: `Staging deploy ready for review`, url: `https://staging.example.com`, });
return { env: 'staging', healthy: healthCheck.ok }; }, { compensate: async () => { // Rollback staging to previous version await k8s.rollback('staging'); }, }) .waitFor('production-approval', { timeout: 48 * 60 * 60 * 1000 }) // 48h timeout .step('deploy-production', async (ctx) => { const approval = ctx.signals['production-approval'] as { approver: string; strategy: 'rolling' | 'blue-green' | 'canary'; };
const { artifact } = ctx.steps['build'] as { artifact: string };
// Deploy with the approved strategy await k8s.deploy('production', artifact, { strategy: approval.strategy }); await k8s.waitForHealthy('production', 120000);
await slack.send('#deploys', { text: `Production deploy complete (${approval.strategy})`, approvedBy: approval.approver, });
return { env: 'production', approvedBy: approval.approver, strategy: approval.strategy, }; }, { compensate: async () => { await k8s.rollback('production'); await slack.send('#deploys', { text: 'Production rolled back!' }); }, });
// Usageconst run = await engine.start('deploy-pipeline', { repo: 'myapp', branch: 'release/v2.5', commitSha: 'abc123f',});
// After QA on staging (hours/days later):await engine.signal(run.id, 'production-approval', { approver: 'cto@company.com', strategy: 'canary',});KYC Onboarding with Risk-Based Branching
Section titled “KYC Onboarding with Risk-Based Branching”Different verification paths based on risk scoring — low-risk users get auto-approved, medium-risk need document upload, high-risk go to manual compliance review:
const kycFlow = new Workflow('kyc-onboarding') .step('create-account', async (ctx) => { const { email, name, country } = ctx.input as OnboardingInput; const user = await db.users.create({ email, name, country, status: 'pending' }); return { userId: user.id }; }, { compensate: async (ctx) => { // Delete the account if verification fails const { userId } = ctx.steps['create-account'] as { userId: string }; await db.users.delete(userId); }, }) .step('risk-assessment', async (ctx) => { const { country, email, ip } = ctx.input as OnboardingInput; const score = await riskEngine.assess({ country, email, ip });
return { score, riskLevel: score > 80 ? 'low' : score > 50 ? 'medium' : 'high', }; }) .branch((ctx) => (ctx.steps['risk-assessment'] as { riskLevel: string }).riskLevel) .path('low', (w) => w.step('auto-approve', async () => { return { approved: true, method: 'automatic', verifiedAt: Date.now() }; }) ) .path('medium', (w) => w.step('request-documents', async (ctx) => { const { userId } = ctx.steps['create-account'] as { userId: string }; await mailer.send('document-request', { userId }); return { documentsRequested: true }; }) .waitFor('documents-uploaded') .step('verify-documents', async (ctx) => { const docs = ctx.signals['documents-uploaded'] as { files: string[] }; const verification = await docVerification.check(docs.files);
if (!verification.passed) { throw new Error(`Document verification failed: ${verification.reason}`); }
return { approved: true, method: 'document-review', verifiedAt: Date.now() }; }) ) .path('high', (w) => w.step('flag-compliance', async (ctx) => { const { userId } = ctx.steps['create-account'] as { userId: string }; const { score } = ctx.steps['risk-assessment'] as { score: number }; await complianceQueue.assign({ userId, riskScore: score }); return { flagged: true }; }) .waitFor('compliance-decision') .step('apply-compliance-decision', async (ctx) => { const decision = ctx.signals['compliance-decision'] as { approved: boolean; reviewer: string; notes: string; };
if (!decision.approved) { throw new Error(`Rejected by compliance: ${decision.notes}`); }
return { approved: true, method: 'compliance-review', reviewer: decision.reviewer }; }) ) .step('activate-account', async (ctx) => { const { userId } = ctx.steps['create-account'] as { userId: string }; await db.users.update(userId, { status: 'active', activatedAt: Date.now() }); await mailer.send('welcome', { userId }); return { activated: true }; });Data Pipeline (ETL)
Section titled “Data Pipeline (ETL)”Extract, transform, load with metrics aggregation at each stage:
const etlFlow = new Workflow('daily-etl') .step('extract', async (ctx) => { const { date, sources } = ctx.input as { date: string; sources: string[] }; const records: Record[] = [];
for (const source of sources) { const data = await dataLake.query(source, { date }); records.push(...data); }
return { totalRecords: records.length, sources: sources.length, data: records }; }) .step('transform', async (ctx) => { const { data } = ctx.steps['extract'] as { data: Record[] };
const cleaned = data .filter(r => r.timestamp && r.value !== null) .map(r => ({ ...r, value: normalize(r.value), processedAt: Date.now() }));
const dropped = data.length - cleaned.length;
return { cleanedRecords: cleaned.length, droppedRecords: dropped, data: cleaned }; }) .step('load', async (ctx) => { const { data } = ctx.steps['transform'] as { data: CleanRecord[] }; const extract = ctx.steps['extract'] as { totalRecords: number; sources: number }; const transform = ctx.steps['transform'] as { cleanedRecords: number; droppedRecords: number };
// Batch insert const batches = chunk(data, 1000); for (const batch of batches) { await warehouse.insertBatch('analytics_events', batch); }
return { pipeline: 'daily-etl', date: (ctx.input as { date: string }).date, metrics: { sourcesProcessed: extract.sources, rawRecords: extract.totalRecords, cleanedRecords: transform.cleanedRecords, droppedRecords: transform.droppedRecords, loadedRecords: data.length, }, }; });Putting It Together: ETL with Observability
Section titled “Putting It Together: ETL with Observability”Wire up the ETL pipeline with monitoring and cleanup:
const engine = new Engine({ embedded: true, dataPath: './data/etl.db' });
// Observability: track step durations and failuresengine.on('step:started', (e) => { const { stepName } = e as StepEvent; metrics.startTimer(`etl.${stepName}.duration`);});engine.on('step:completed', (e) => { const { stepName } = e as StepEvent; metrics.stopTimer(`etl.${stepName}.duration`); metrics.increment('etl.steps.completed');});engine.on('step:retry', (e) => { const { stepName, attempt, error } = e as StepEvent; logger.warn(`Retrying ${stepName}, attempt ${attempt}: ${error}`);});engine.on('workflow:failed', (e) => { alerting.pagerduty(`ETL pipeline failed: ${e.executionId}`);});
engine.register(etlFlow);
// Run daily via cronawait engine.start('daily-etl', { date: '2026-04-10', sources: ['clickstream', 'transactions'] });
// Cleanup: archive completed runs older than 30 days, delete archived after 90 daysengine.archive(30 * 24 * 60 * 60 * 1000, ['completed']);engine.cleanup(90 * 24 * 60 * 60 * 1000);Batch Processing with forEach and Map
Section titled “Batch Processing with forEach and Map”Process a list of invoices, aggregate results, and send a summary:
import { z } from 'zod';
const InvoiceInput = z.object({ invoiceIds: z.array(z.string()), batchId: z.string(),});
const invoiceFlow = new Workflow<{ invoiceIds: string[]; batchId: string }>('process-invoices') // Validate input with Zod schema .step('init', async (ctx) => { return { count: (ctx.input as { invoiceIds: string[] }).invoiceIds.length }; }, { inputSchema: InvoiceInput })
// Process each invoice .forEach( (ctx) => (ctx.input as { invoiceIds: string[] }).invoiceIds, 'process-invoice', async (ctx) => { const invoiceId = ctx.steps.__item as string; const result = await billingService.process(invoiceId); return { invoiceId, amount: result.amount, status: result.status }; }, { retry: 3, timeout: 15_000 } )
// Aggregate all invoice results .map('summary', (ctx) => { const results: { amount: number; status: string }[] = []; let i = 0; while (ctx.steps[`process-invoice:${i}`]) { results.push(ctx.steps[`process-invoice:${i}`] as { amount: number; status: string }); i++; } const total = results.reduce((sum, r) => sum + r.amount, 0); const failed = results.filter(r => r.status === 'failed').length; return { total, processed: results.length, failed }; })
// Send summary report .step('report', async (ctx) => { const summary = ctx.steps['summary'] as { total: number; processed: number; failed: number }; await notificationService.send({ channel: '#billing', text: `Batch complete: ${summary.processed} invoices, $${summary.total} total, ${summary.failed} failures`, }); return { reported: true }; });Retry Loop with doUntil
Section titled “Retry Loop with doUntil”Poll an external API until a resource is ready:
const deployFlow = new Workflow<{ deployId: string }>('wait-deploy') .step('trigger', async (ctx) => { const id = (ctx.input as { deployId: string }).deployId; await cloudProvider.triggerDeploy(id); return { deployId: id }; }) .doUntil( (ctx) => (ctx.steps['check-status'] as { ready: boolean })?.ready === true, (w) => w.step('check-status', async (ctx) => { const id = (ctx.steps['trigger'] as { deployId: string }).deployId; const status = await cloudProvider.getDeployStatus(id); // Simulate wait between polls await new Promise((r) => setTimeout(r, 5000)); return { ready: status === 'running', status }; }, { retry: 1, timeout: 30_000 }), { maxIterations: 60 } // max 5 minutes of polling ) .step('verify', async (ctx) => { const id = (ctx.steps['trigger'] as { deployId: string }).deployId; const health = await cloudProvider.healthCheck(id); return { healthy: health.ok }; });How It Works Internally
Section titled “How It Works Internally”The workflow engine is a pure consumer layer built on top of bunqueue. Zero modifications to the core engine.
Workflow DSL (.step / .branch / .waitFor) │ ▼┌──────────────────────────────────────────────────────────────────┐│ Engine ││ ││ ┌─────────────────────────────────────────────────────────┐ ││ │ Executor │ ││ │ • Resolves current node (step/branch/parallel/waitFor/ │ ││ │ doUntil/doWhile/forEach/map) │ ││ │ • Runs step handler with timeout + retry (backoff) │ ││ │ • Schema validation (inputSchema/outputSchema) │ ││ │ • Evaluates branch condition, picks path │ ││ │ • Runs parallel steps via Promise.allSettled │ ││ │ • Executes loops (doUntil/doWhile) with maxIterations │ ││ │ • forEach: iterates items with indexed step names │ ││ │ • map: synchronous data transforms │ ││ │ • Checks signal availability + timeout for waitFor │ ││ │ • Dispatches sub-workflows, polls until complete │ ││ │ • Runs compensation in reverse on failure │ ││ └──────────────────┬──────────────────────────────────────┘ ││ │ ││ ┌──────────────────┼──────────────────────────────────────┐ ││ │ │ │ ││ │ ┌────────┐ ┌───▼────┐ ┌───────────────────────┐ │ ││ │ │ Queue │ │ Worker │ │ Store (SQLite) │ │ ││ │ │__wf: │ │ pulls │ │ workflow_executions │ │ ││ │ │steps │──│ & runs │──│ table: id, state, │ │ ││ │ │ │ │ steps │ │ input, steps, signals │ │ ││ │ └────────┘ └────────┘ └───────────────────────┘ │ ││ │ bunqueue internals (Queue + Worker + SQLite) │ ││ └─────────────────────────────────────────────────────────┘ │└──────────────────────────────────────────────────────────────────┘Execution flow:
engine.start()creates anExecutionrecord in SQLite and enqueues the first step as a job on the internal__wf:stepsqueue- Worker picks up the step job. The Executor loads the execution state, resolves the current node
- Step node: runs the handler with retry + timeout, saves result, enqueues next node
- Branch node: evaluates the condition function, runs the matching path’s steps inline
- Parallel node: runs all steps via
Promise.allSettled, saves all results, then advances - WaitFor node: checks if the signal exists. If not, sets state to
'waiting'and schedules a timeout check if configured - SubWorkflow node: starts a child execution, polls until it reaches a terminal state, saves child results under
sub:<name> - DoUntil/DoWhile node: runs loop steps repeatedly, checking condition before (doWhile) or after (doUntil) each iteration
- ForEach node: extracts items array, runs the step for each with indexed names (
step:0,step:1, …) - Map node: runs a synchronous transform, stores result, then advances
engine.signal()stores the signal payload and re-enqueues the current node- On failure: the Executor walks completed steps in reverse, calling each compensate handler
Each workflow step is a regular bunqueue job. You get all of bunqueue’s features for free: SQLite persistence, concurrency control, and monitoring via the dashboard.
Next Steps
Section titled “Next Steps”- Simple Mode — All-in-one Queue + Worker for simpler use cases
- Queue API — Low-level queue operations
- Flow Producer — Parent-child job dependencies (simpler than workflows)
- MCP Server — Let AI agents orchestrate workflows via natural language
- Examples — More code recipes