Building Job Pipelines with FlowProducer
Some tasks aren’t a single job - they’re a pipeline. Resize an image, then generate thumbnails, then update the CDN. bunqueue’s FlowProducer lets you express these dependencies naturally: children execute first, and the parent job runs only when all children complete.
The FlowProducer API
import { FlowProducer, Worker } from 'bunqueue/client';
const flow = new FlowProducer({ embedded: true });FlowProducer supports the BullMQ v5 API for defining job trees with parent-child relationships.
Basic Flow: Parent Waits for Children
The most common pattern: a parent job that depends on multiple child jobs.
const result = await flow.add({ name: 'generate-report', queueName: 'reports', data: { reportId: 'q4-2024' }, children: [ { name: 'fetch-sales', queueName: 'data-fetch', data: { source: 'sales-db', quarter: 'Q4' }, }, { name: 'fetch-expenses', queueName: 'data-fetch', data: { source: 'expense-db', quarter: 'Q4' }, }, { name: 'fetch-metrics', queueName: 'data-fetch', data: { source: 'analytics', quarter: 'Q4' }, }, ],});
console.log(result.job.id); // Parent job IDconsole.log(result.children?.length); // 3 childrenThe execution order:
- All three
fetch-*children start processing (in parallel) - When all children complete,
generate-reportbecomes available - A worker picks up
generate-reportand can access children’s results
Accessing Children’s Results
The parent job can retrieve the results of its children:
const reportWorker = new Worker('reports', async (job) => { // Get all children's return values const childResults = await job.getChildrenValues();
// childResults is a Record<string, unknown> // Keys are "{queueName}:{jobId}" format const salesData = Object.values(childResults)[0]; const expenseData = Object.values(childResults)[1]; const metricsData = Object.values(childResults)[2];
return generateReport(salesData, expenseData, metricsData);}, { embedded: true });Nested Flows (Multi-Level Trees)
Children can have their own children, creating deep dependency trees:
await flow.add({ name: 'deploy', queueName: 'deployment', data: { version: '2.1.0' }, children: [ { name: 'build', queueName: 'ci', data: { step: 'build' }, children: [ { name: 'lint', queueName: 'ci', data: { step: 'lint' }, }, { name: 'test', queueName: 'ci', data: { step: 'test' }, }, ], }, { name: 'migrate-db', queueName: 'db', data: { migration: '045_add_index' }, }, ],});Execution order:
lintandtestrun in parallel- When both complete,
buildruns migrate-dbalso runs in parallel withbuild- When both
buildandmigrate-dbcomplete,deployruns
Chain Pattern: Sequential Steps
For strictly sequential pipelines:
// Using nested children for a chainawait flow.add({ name: 'step-3-notify', queueName: 'pipeline', data: { step: 3 }, children: [ { name: 'step-2-process', queueName: 'pipeline', data: { step: 2 }, children: [ { name: 'step-1-fetch', queueName: 'pipeline', data: { step: 1 }, }, ], }, ],});// Executes: step-1 → step-2 → step-3Bulk Flows
Add multiple independent flows at once:
const results = await flow.addBulk([ { name: 'process-order', queueName: 'orders', data: { orderId: 'A001' }, children: [ { name: 'validate', queueName: 'validation', data: { orderId: 'A001' } }, { name: 'check-stock', queueName: 'inventory', data: { orderId: 'A001' } }, ], }, { name: 'process-order', queueName: 'orders', data: { orderId: 'A002' }, children: [ { name: 'validate', queueName: 'validation', data: { orderId: 'A002' } }, { name: 'check-stock', queueName: 'inventory', data: { orderId: 'A002' } }, ], },]);Retrieving Flow State
Inspect a flow tree and its current state:
const tree = await flow.getFlow({ id: parentJobId, queueName: 'reports', depth: 3, // How deep to traverse maxChildren: 100, // Max children per level});
// tree.job - the parent job details// tree.children - array of child nodes (recursive)Error Handling in Flows
Control how child failures affect the parent:
await flow.add({ name: 'parent', queueName: 'main', data: {}, children: [ { name: 'critical-child', queueName: 'tasks', data: {}, opts: { failParentOnFailure: true, // Parent fails if this child fails }, }, { name: 'optional-child', queueName: 'tasks', data: {}, opts: { ignoreDependencyOnFailure: true, // Parent proceeds even if this fails }, }, ],});Real-World Example: Image Processing Pipeline
import { FlowProducer, Worker } from 'bunqueue/client';
const flow = new FlowProducer({ embedded: true });
// Define the pipelineasync function processImage(imageUrl: string) { return await flow.add({ name: 'update-cdn', queueName: 'cdn', data: { imageUrl }, children: [ { name: 'generate-thumbnails', queueName: 'images', data: { imageUrl, sizes: [100, 300, 800] }, children: [ { name: 'download-original', queueName: 'images', data: { imageUrl }, }, ], }, { name: 'extract-metadata', queueName: 'images', data: { imageUrl }, opts: { ignoreDependencyOnFailure: true }, }, ], });}
// Workers for each queuenew Worker('images', async (job) => { switch (job.name) { case 'download-original': return await downloadImage(job.data.imageUrl); case 'generate-thumbnails': const original = await job.getChildrenValues(); return await createThumbnails(original, job.data.sizes); case 'extract-metadata': return await extractEXIF(job.data.imageUrl); }}, { embedded: true });
new Worker('cdn', async (job) => { const results = await job.getChildrenValues(); await uploadToCDN(results); return { published: true };}, { embedded: true });Dependency Resolution Performance
bunqueue uses event-driven dependency resolution via microtask coalescing. When a child job completes, the parent’s dependency state is updated immediately rather than on a polling interval:
| Pattern | Old Polling (100ms) | Event-Driven |
|---|---|---|
| Simple parent-child | ~100ms latency | ~28 microseconds |
| Chain (4 levels) | ~300ms | ~28 microseconds |
| Fan-out (1 to 5) | ~100ms | ~31 microseconds |
This makes complex flow trees execute with minimal overhead between steps.