Skip to content

Building Job Pipelines with FlowProducer

Some tasks aren’t a single job - they’re a pipeline. Resize an image, then generate thumbnails, then update the CDN. bunqueue’s FlowProducer lets you express these dependencies naturally: children execute first, and the parent job runs only when all children complete.

The FlowProducer API

import { FlowProducer, Worker } from 'bunqueue/client';
const flow = new FlowProducer({ embedded: true });

FlowProducer supports the BullMQ v5 API for defining job trees with parent-child relationships.

Basic Flow: Parent Waits for Children

The most common pattern: a parent job that depends on multiple child jobs.

const result = await flow.add({
name: 'generate-report',
queueName: 'reports',
data: { reportId: 'q4-2024' },
children: [
{
name: 'fetch-sales',
queueName: 'data-fetch',
data: { source: 'sales-db', quarter: 'Q4' },
},
{
name: 'fetch-expenses',
queueName: 'data-fetch',
data: { source: 'expense-db', quarter: 'Q4' },
},
{
name: 'fetch-metrics',
queueName: 'data-fetch',
data: { source: 'analytics', quarter: 'Q4' },
},
],
});
console.log(result.job.id); // Parent job ID
console.log(result.children?.length); // 3 children

The execution order:

  1. All three fetch-* children start processing (in parallel)
  2. When all children complete, generate-report becomes available
  3. A worker picks up generate-report and can access children’s results

Accessing Children’s Results

The parent job can retrieve the results of its children:

const reportWorker = new Worker('reports', async (job) => {
// Get all children's return values
const childResults = await job.getChildrenValues();
// childResults is a Record<string, unknown>
// Keys are "{queueName}:{jobId}" format
const salesData = Object.values(childResults)[0];
const expenseData = Object.values(childResults)[1];
const metricsData = Object.values(childResults)[2];
return generateReport(salesData, expenseData, metricsData);
}, { embedded: true });

Nested Flows (Multi-Level Trees)

Children can have their own children, creating deep dependency trees:

await flow.add({
name: 'deploy',
queueName: 'deployment',
data: { version: '2.1.0' },
children: [
{
name: 'build',
queueName: 'ci',
data: { step: 'build' },
children: [
{
name: 'lint',
queueName: 'ci',
data: { step: 'lint' },
},
{
name: 'test',
queueName: 'ci',
data: { step: 'test' },
},
],
},
{
name: 'migrate-db',
queueName: 'db',
data: { migration: '045_add_index' },
},
],
});

Execution order:

  1. lint and test run in parallel
  2. When both complete, build runs
  3. migrate-db also runs in parallel with build
  4. When both build and migrate-db complete, deploy runs

Chain Pattern: Sequential Steps

For strictly sequential pipelines:

// Using nested children for a chain
await flow.add({
name: 'step-3-notify',
queueName: 'pipeline',
data: { step: 3 },
children: [
{
name: 'step-2-process',
queueName: 'pipeline',
data: { step: 2 },
children: [
{
name: 'step-1-fetch',
queueName: 'pipeline',
data: { step: 1 },
},
],
},
],
});
// Executes: step-1 → step-2 → step-3

Bulk Flows

Add multiple independent flows at once:

const results = await flow.addBulk([
{
name: 'process-order',
queueName: 'orders',
data: { orderId: 'A001' },
children: [
{ name: 'validate', queueName: 'validation', data: { orderId: 'A001' } },
{ name: 'check-stock', queueName: 'inventory', data: { orderId: 'A001' } },
],
},
{
name: 'process-order',
queueName: 'orders',
data: { orderId: 'A002' },
children: [
{ name: 'validate', queueName: 'validation', data: { orderId: 'A002' } },
{ name: 'check-stock', queueName: 'inventory', data: { orderId: 'A002' } },
],
},
]);

Retrieving Flow State

Inspect a flow tree and its current state:

const tree = await flow.getFlow({
id: parentJobId,
queueName: 'reports',
depth: 3, // How deep to traverse
maxChildren: 100, // Max children per level
});
// tree.job - the parent job details
// tree.children - array of child nodes (recursive)

Error Handling in Flows

Control how child failures affect the parent:

await flow.add({
name: 'parent',
queueName: 'main',
data: {},
children: [
{
name: 'critical-child',
queueName: 'tasks',
data: {},
opts: {
failParentOnFailure: true, // Parent fails if this child fails
},
},
{
name: 'optional-child',
queueName: 'tasks',
data: {},
opts: {
ignoreDependencyOnFailure: true, // Parent proceeds even if this fails
},
},
],
});

Real-World Example: Image Processing Pipeline

import { FlowProducer, Worker } from 'bunqueue/client';
const flow = new FlowProducer({ embedded: true });
// Define the pipeline
async function processImage(imageUrl: string) {
return await flow.add({
name: 'update-cdn',
queueName: 'cdn',
data: { imageUrl },
children: [
{
name: 'generate-thumbnails',
queueName: 'images',
data: { imageUrl, sizes: [100, 300, 800] },
children: [
{
name: 'download-original',
queueName: 'images',
data: { imageUrl },
},
],
},
{
name: 'extract-metadata',
queueName: 'images',
data: { imageUrl },
opts: { ignoreDependencyOnFailure: true },
},
],
});
}
// Workers for each queue
new Worker('images', async (job) => {
switch (job.name) {
case 'download-original':
return await downloadImage(job.data.imageUrl);
case 'generate-thumbnails':
const original = await job.getChildrenValues();
return await createThumbnails(original, job.data.sizes);
case 'extract-metadata':
return await extractEXIF(job.data.imageUrl);
}
}, { embedded: true });
new Worker('cdn', async (job) => {
const results = await job.getChildrenValues();
await uploadToCDN(results);
return { published: true };
}, { embedded: true });

Dependency Resolution Performance

bunqueue uses event-driven dependency resolution via microtask coalescing. When a child job completes, the parent’s dependency state is updated immediately rather than on a polling interval:

PatternOld Polling (100ms)Event-Driven
Simple parent-child~100ms latency~28 microseconds
Chain (4 levels)~300ms~28 microseconds
Fan-out (1 to 5)~100ms~31 microseconds

This makes complex flow trees execute with minimal overhead between steps.