FlowProducer — Job Dependencies, Chains & Parallel Workflows
Create job flows with automatic dependency management: sequential chains, parallel execution with merge, and tree structures.
Basic Usage
Section titled “Basic Usage”import { FlowProducer } from 'bunqueue/client';
const flow = new FlowProducer({ embedded: true });Sequential Chain
Section titled “Sequential Chain”Execute jobs in sequence where each job depends on the previous one completing.
// A → B → C (sequential execution)const { jobIds } = await flow.addChain([ { name: 'fetch', queueName: 'pipeline', data: { url: 'https://api.example.com' } }, { name: 'process', queueName: 'pipeline', data: {} }, { name: 'store', queueName: 'pipeline', data: {} },]);
console.log('Chain job IDs:', jobIds);// Jobs execute in order: fetch completes → process starts → store startsParallel with Merge
Section titled “Parallel with Merge”Run multiple jobs in parallel, then execute a final job after all complete.
// task1 ──┐// task2 ──┼──→ merge// task3 ──┘
const { parallelIds, finalId } = await flow.addBulkThen( [ { name: 'fetch-api-1', queueName: 'parallel', data: { source: 'api1' } }, { name: 'fetch-api-2', queueName: 'parallel', data: { source: 'api2' } }, { name: 'fetch-api-3', queueName: 'parallel', data: { source: 'api3' } }, ], { name: 'merge-results', queueName: 'parallel', data: {} });
console.log('Parallel IDs:', parallelIds);console.log('Final merge job:', finalId);Tree Structure
Section titled “Tree Structure”Create hierarchical job trees where children depend on their parent.
const { jobIds } = await flow.addTree({ name: 'root', queueName: 'tree', data: { level: 0 }, children: [ { name: 'branch-1', queueName: 'tree', data: { level: 1 }, children: [ { name: 'leaf-1a', queueName: 'tree', data: { level: 2 } }, { name: 'leaf-1b', queueName: 'tree', data: { level: 2 } }, ], }, { name: 'branch-2', queueName: 'tree', data: { level: 1 }, }, ],});Accessing Parent Results
Section titled “Accessing Parent Results”Workers can access results from previous jobs in the chain.
FlowJobData Type
Section titled “FlowJobData Type”The FlowJobData interface is automatically intersected with your job data type T in Worker callbacks. You can also import it explicitly:
import type { FlowJobData } from 'bunqueue/client';
interface MyJobData extends FlowJobData { email: string; subject: string;}import { FlowProducer, Worker } from 'bunqueue/client';
const flow = new FlowProducer({ embedded: true });
const worker = new Worker('pipeline', async (job) => { // Check if this job has a parent (chain scenario) // __flowParentId is automatically injected by FlowProducer if (job.data.__flowParentId) { const parentResult = flow.getParentResult(job.data.__flowParentId); console.log('Parent result:', parentResult); }
// Check if this job has multiple parents (merge scenario) // __flowParentIds is automatically injected for merge flows if (job.data.__flowParentIds) { const parentResults = flow.getParentResults(job.data.__flowParentIds); parentResults.forEach((result, id) => { console.log(`Parent ${id}:`, result); }); }
return { processed: true };}, { embedded: true });Job Options
Section titled “Job Options”Each step can have its own options.
await flow.addChain([ { name: 'fetch', queueName: 'pipeline', data: { url: '...' }, opts: { priority: 10, attempts: 5, timeout: 30000, }, }, { name: 'process', queueName: 'pipeline', data: {}, opts: { delay: 1000, // Wait 1s after fetch completes }, },]);FlowStep Interface
Section titled “FlowStep Interface”interface FlowStep<T = unknown> { name: string; // Job name queueName: string; // Target queue data: T; // Job data opts?: JobOptions; // Optional job options children?: FlowStep[]; // Child steps (for tree structures)}BullMQ v5 Compatible API
Section titled “BullMQ v5 Compatible API”FlowProducer also supports the BullMQ v5 flow API where children are processed before their parent. This is the inverse of the bunqueue-native API above.
add(flow) — Add a Flow Tree
Section titled “add(flow) — Add a Flow Tree”const result = await flow.add({ name: 'parent-job', queueName: 'my-queue', data: { type: 'report' }, children: [ { name: 'child-1', queueName: 'my-queue', data: { section: 'intro' } }, { name: 'child-2', queueName: 'my-queue', data: { section: 'body' } }, ],});
// result.job — the parent Job// result.children — array of JobNode (each with .job and optional .children)Children complete first, then the parent becomes available for processing. Inside the parent’s worker, use job.getChildrenValues() to access child results.
addBulk(flows) — Add Multiple Flows
Section titled “addBulk(flows) — Add Multiple Flows”const results = await flow.addBulk([ { name: 'report-1', queueName: 'reports', data: {}, children: [...] }, { name: 'report-2', queueName: 'reports', data: {}, children: [...] },]);getFlow(opts) — Retrieve a Flow Tree
Section titled “getFlow(opts) — Retrieve a Flow Tree”const tree = await flow.getFlow({ id: 'job-123', // Root job ID queueName: 'my-queue', // Queue name depth: 3, // Max depth to traverse (default: Infinity) maxChildren: 100, // Max children per node (default: unlimited)});
if (tree) { console.log(tree.job.id); // Root job console.log(tree.children); // Child nodes (recursive)}FlowJob Interface
Section titled “FlowJob Interface”interface FlowJob<T = unknown> { name: string; // Job name queueName: string; // Target queue data: T; // Job data opts?: JobOptions; // Optional job options children?: FlowJob[]; // Child jobs (processed BEFORE parent)}JobNode Interface
Section titled “JobNode Interface”interface JobNode<T = unknown> { job: Job<T>; // The job instance children?: JobNode[]; // Child nodes}Methods Reference
Section titled “Methods Reference”| Method | Description |
|---|---|
add(flow) | BullMQ v5: tree where children complete before parent |
addBulk(flows[]) | BullMQ v5: add multiple flow trees |
getFlow(opts) | BullMQ v5: retrieve a flow tree by root job ID |
addChain(steps[]) | Sequential execution: A → B → C |
addBulkThen(parallel[], final) | Parallel then converge: [A, B, C] → D |
addTree(root) | Hierarchical tree with nested children |
getParentResult(parentId) | Get result from single parent job (embedded only) |
getParentResults(parentIds[]) | Get results from multiple parent jobs (embedded only) |
Complete Example
Section titled “Complete Example”import { FlowProducer, Worker, Queue, shutdownManager } from 'bunqueue/client';
// Create queuesconst pipelineQueue = new Queue('pipeline', { embedded: true });
// Create flow producerconst flow = new FlowProducer({ embedded: true });
// Create workerconst worker = new Worker('pipeline', async (job) => { console.log(`Processing ${job.data.name || job.name}`);
if (job.name === 'fetch') { // Simulate API call return { data: [1, 2, 3] }; }
if (job.name === 'process') { // Access parent result const fetchResult = flow.getParentResult(job.data.__flowParentId); return { processed: fetchResult.data.map(x => x * 2) }; }
if (job.name === 'store') { const processResult = flow.getParentResult(job.data.__flowParentId); console.log('Storing:', processResult.processed); return { stored: true }; }
return {};}, { embedded: true, concurrency: 3 });
// Add a pipelineconst { jobIds } = await flow.addChain([ { name: 'fetch', queueName: 'pipeline', data: {} }, { name: 'process', queueName: 'pipeline', data: {} }, { name: 'store', queueName: 'pipeline', data: {} },]);
console.log('Pipeline started with jobs:', jobIds);
// Cleanupprocess.on('SIGINT', async () => { await worker.close(); shutdownManager(); process.exit(0);});