Flow Producer
Create job flows with automatic dependency management: sequential chains, parallel execution with merge, and tree structures.
Basic Usage
import { FlowProducer } from 'bunqueue/client';
const flow = new FlowProducer();Sequential Chain
Execute jobs in sequence where each job depends on the previous one completing.
// A → B → C (sequential execution)const { jobIds } = await flow.addChain([ { name: 'fetch', queueName: 'pipeline', data: { url: 'https://api.example.com' } }, { name: 'process', queueName: 'pipeline', data: {} }, { name: 'store', queueName: 'pipeline', data: {} },]);
console.log('Chain job IDs:', jobIds);// Jobs execute in order: fetch completes → process starts → store startsParallel with Merge
Run multiple jobs in parallel, then execute a final job after all complete.
// task1 ──┐// task2 ──┼──→ merge// task3 ──┘
const { parallelIds, finalId } = await flow.addBulkThen( [ { name: 'fetch-api-1', queueName: 'parallel', data: { source: 'api1' } }, { name: 'fetch-api-2', queueName: 'parallel', data: { source: 'api2' } }, { name: 'fetch-api-3', queueName: 'parallel', data: { source: 'api3' } }, ], { name: 'merge-results', queueName: 'parallel', data: {} });
console.log('Parallel IDs:', parallelIds);console.log('Final merge job:', finalId);Tree Structure
Create hierarchical job trees where children depend on their parent.
const { jobIds } = await flow.addTree({ name: 'root', queueName: 'tree', data: { level: 0 }, children: [ { name: 'branch-1', queueName: 'tree', data: { level: 1 }, children: [ { name: 'leaf-1a', queueName: 'tree', data: { level: 2 } }, { name: 'leaf-1b', queueName: 'tree', data: { level: 2 } }, ], }, { name: 'branch-2', queueName: 'tree', data: { level: 1 }, }, ],});Accessing Parent Results
Workers can access results from previous jobs in the chain.
import { FlowProducer, Worker } from 'bunqueue/client';
const flow = new FlowProducer();
const worker = new Worker('pipeline', async (job) => { // Check if this job has a parent (chain scenario) // __flowParentId is automatically injected by FlowProducer if (job.data.__flowParentId) { const parentResult = flow.getParentResult(job.data.__flowParentId); console.log('Parent result:', parentResult); }
// Check if this job has multiple parents (merge scenario) // __flowParentIds is automatically injected for merge flows if (job.data.__flowParentIds) { const parentResults = flow.getParentResults(job.data.__flowParentIds); parentResults.forEach((result, id) => { console.log(`Parent ${id}:`, result); }); }
return { processed: true };}, { embedded: true });Job Options
Each step can have its own options.
await flow.addChain([ { name: 'fetch', queueName: 'pipeline', data: { url: '...' }, opts: { priority: 10, attempts: 5, timeout: 30000, }, }, { name: 'process', queueName: 'pipeline', data: {}, opts: { delay: 1000, // Wait 1s after fetch completes }, },]);FlowStep Interface
interface FlowStep<T = unknown> { name: string; // Job name queueName: string; // Target queue data: T; // Job data opts?: JobOptions; // Optional job options children?: FlowStep[]; // Child steps (for tree structures)}Methods Reference
| Method | Description |
|---|---|
addChain(steps[]) | Sequential execution: A → B → C |
addBulkThen(parallel[], final) | Parallel then converge: [A, B, C] → D |
addTree(root) | Hierarchical tree with nested children |
getParentResult(parentId) | Get result from single parent job |
getParentResults(parentIds[]) | Get results from multiple parent jobs |
Complete Example
import { FlowProducer, Worker, Queue, shutdownManager } from 'bunqueue/client';
// Create queuesconst pipelineQueue = new Queue('pipeline', { embedded: true });
// Create flow producerconst flow = new FlowProducer();
// Create workerconst worker = new Worker('pipeline', async (job) => { console.log(`Processing ${job.data.name || job.name}`);
if (job.name === 'fetch') { // Simulate API call return { data: [1, 2, 3] }; }
if (job.name === 'process') { // Access parent result const fetchResult = flow.getParentResult(job.data.__flowParentId); return { processed: fetchResult.data.map(x => x * 2) }; }
if (job.name === 'store') { const processResult = flow.getParentResult(job.data.__flowParentId); console.log('Storing:', processResult.processed); return { stored: true }; }
return {};}, { embedded: true, concurrency: 3 });
// Add a pipelineconst { jobIds } = await flow.addChain([ { name: 'fetch', queueName: 'pipeline', data: {} }, { name: 'process', queueName: 'pipeline', data: {} }, { name: 'store', queueName: 'pipeline', data: {} },]);
console.log('Pipeline started with jobs:', jobIds);
// Cleanupprocess.on('SIGINT', async () => { await worker.close(); shutdownManager(); process.exit(0);});