Skip to content

FlowProducer — Job Dependencies, Chains & Parallel Workflows

Create job flows with automatic dependency management: sequential chains, parallel execution with merge, and tree structures.

import { FlowProducer } from 'bunqueue/client';
const flow = new FlowProducer({ embedded: true });

Execute jobs in sequence where each job depends on the previous one completing.

// A → B → C (sequential execution)
const { jobIds } = await flow.addChain([
{ name: 'fetch', queueName: 'pipeline', data: { url: 'https://api.example.com' } },
{ name: 'process', queueName: 'pipeline', data: {} },
{ name: 'store', queueName: 'pipeline', data: {} },
]);
console.log('Chain job IDs:', jobIds);
// Jobs execute in order: fetch completes → process starts → store starts

Run multiple jobs in parallel, then execute a final job after all complete.

// task1 ──┐
// task2 ──┼──→ merge
// task3 ──┘
const { parallelIds, finalId } = await flow.addBulkThen(
[
{ name: 'fetch-api-1', queueName: 'parallel', data: { source: 'api1' } },
{ name: 'fetch-api-2', queueName: 'parallel', data: { source: 'api2' } },
{ name: 'fetch-api-3', queueName: 'parallel', data: { source: 'api3' } },
],
{ name: 'merge-results', queueName: 'parallel', data: {} }
);
console.log('Parallel IDs:', parallelIds);
console.log('Final merge job:', finalId);

Create hierarchical job trees where children depend on their parent.

const { jobIds } = await flow.addTree({
name: 'root',
queueName: 'tree',
data: { level: 0 },
children: [
{
name: 'branch-1',
queueName: 'tree',
data: { level: 1 },
children: [
{ name: 'leaf-1a', queueName: 'tree', data: { level: 2 } },
{ name: 'leaf-1b', queueName: 'tree', data: { level: 2 } },
],
},
{
name: 'branch-2',
queueName: 'tree',
data: { level: 1 },
},
],
});

Workers can access results from previous jobs in the chain.

The FlowJobData interface is automatically intersected with your job data type T in Worker callbacks. You can also import it explicitly:

import type { FlowJobData } from 'bunqueue/client';
interface MyJobData extends FlowJobData {
email: string;
subject: string;
}
import { FlowProducer, Worker } from 'bunqueue/client';
const flow = new FlowProducer({ embedded: true });
const worker = new Worker('pipeline', async (job) => {
// Check if this job has a parent (chain scenario)
// __flowParentId is automatically injected by FlowProducer
if (job.data.__flowParentId) {
const parentResult = flow.getParentResult(job.data.__flowParentId);
console.log('Parent result:', parentResult);
}
// Check if this job has multiple parents (merge scenario)
// __flowParentIds is automatically injected for merge flows
if (job.data.__flowParentIds) {
const parentResults = flow.getParentResults(job.data.__flowParentIds);
parentResults.forEach((result, id) => {
console.log(`Parent ${id}:`, result);
});
}
return { processed: true };
}, { embedded: true });

Each step can have its own options.

await flow.addChain([
{
name: 'fetch',
queueName: 'pipeline',
data: { url: '...' },
opts: {
priority: 10,
attempts: 5,
timeout: 30000,
},
},
{
name: 'process',
queueName: 'pipeline',
data: {},
opts: {
delay: 1000, // Wait 1s after fetch completes
},
},
]);
interface FlowStep<T = unknown> {
name: string; // Job name
queueName: string; // Target queue
data: T; // Job data
opts?: JobOptions; // Optional job options
children?: FlowStep[]; // Child steps (for tree structures)
}

FlowProducer also supports the BullMQ v5 flow API where children are processed before their parent. This is the inverse of the bunqueue-native API above.

const result = await flow.add({
name: 'parent-job',
queueName: 'my-queue',
data: { type: 'report' },
children: [
{ name: 'child-1', queueName: 'my-queue', data: { section: 'intro' } },
{ name: 'child-2', queueName: 'my-queue', data: { section: 'body' } },
],
});
// result.job — the parent Job
// result.children — array of JobNode (each with .job and optional .children)

Children complete first, then the parent becomes available for processing. Inside the parent’s worker, use job.getChildrenValues() to access child results.

const results = await flow.addBulk([
{ name: 'report-1', queueName: 'reports', data: {}, children: [...] },
{ name: 'report-2', queueName: 'reports', data: {}, children: [...] },
]);
const tree = await flow.getFlow({
id: 'job-123', // Root job ID
queueName: 'my-queue', // Queue name
depth: 3, // Max depth to traverse (default: Infinity)
maxChildren: 100, // Max children per node (default: unlimited)
});
if (tree) {
console.log(tree.job.id); // Root job
console.log(tree.children); // Child nodes (recursive)
}
interface FlowJob<T = unknown> {
name: string; // Job name
queueName: string; // Target queue
data: T; // Job data
opts?: JobOptions; // Optional job options
children?: FlowJob[]; // Child jobs (processed BEFORE parent)
}
interface JobNode<T = unknown> {
job: Job<T>; // The job instance
children?: JobNode[]; // Child nodes
}
MethodDescription
add(flow)BullMQ v5: tree where children complete before parent
addBulk(flows[])BullMQ v5: add multiple flow trees
getFlow(opts)BullMQ v5: retrieve a flow tree by root job ID
addChain(steps[])Sequential execution: A → B → C
addBulkThen(parallel[], final)Parallel then converge: [A, B, C] → D
addTree(root)Hierarchical tree with nested children
getParentResult(parentId)Get result from single parent job (embedded only)
getParentResults(parentIds[])Get results from multiple parent jobs (embedded only)
import { FlowProducer, Worker, Queue, shutdownManager } from 'bunqueue/client';
// Create queues
const pipelineQueue = new Queue('pipeline', { embedded: true });
// Create flow producer
const flow = new FlowProducer({ embedded: true });
// Create worker
const worker = new Worker('pipeline', async (job) => {
console.log(`Processing ${job.data.name || job.name}`);
if (job.name === 'fetch') {
// Simulate API call
return { data: [1, 2, 3] };
}
if (job.name === 'process') {
// Access parent result
const fetchResult = flow.getParentResult(job.data.__flowParentId);
return { processed: fetchResult.data.map(x => x * 2) };
}
if (job.name === 'store') {
const processResult = flow.getParentResult(job.data.__flowParentId);
console.log('Storing:', processResult.processed);
return { stored: true };
}
return {};
}, { embedded: true, concurrency: 3 });
// Add a pipeline
const { jobIds } = await flow.addChain([
{ name: 'fetch', queueName: 'pipeline', data: {} },
{ name: 'process', queueName: 'pipeline', data: {} },
{ name: 'store', queueName: 'pipeline', data: {} },
]);
console.log('Pipeline started with jobs:', jobIds);
// Cleanup
process.on('SIGINT', async () => {
await worker.close();
shutdownManager();
process.exit(0);
});