Skip to content

Production Use Cases & Background Job Patterns for Bun

Every non-trivial application needs background processing. API requests should return immediately while expensive operations — sending emails, processing images, charging payments, calling external APIs — happen asynchronously in the background.

bunqueue provides the infrastructure for all of these patterns: persistent queues backed by SQLite, automatic retries with exponential backoff, dead letter queues for failed jobs, progress tracking, cron scheduling, rate limiting, and real-time monitoring. Each pattern below is production-ready and uses bunqueue’s embedded mode (zero config, no external services).

AI agents are increasingly used to orchestrate complex automation tasks, but they lack the ability to schedule background work, manage retries, or run recurring operations. bunqueue solves this by exposing its entire queue engine as 73 MCP tools that any AI agent can call directly.

When an agent connects to bunqueue via MCP (Model Context Protocol), it gains the ability to add jobs to queues, create cron schedules, set rate limits, monitor queue health, manage dead letter queues, and — with HTTP handlers — auto-process jobs by making HTTP requests without any external worker process.

"Schedule cleanup at 3 AM"
"Register HTTP handler on meteo"
"Retry all failed jobs"
AI Agent Claude, Cursor, Windsurf
MCP Server 73 tools
bunqueue Engine
Workers / HTTP Handlers
AI AgentClaude, Cursor, Windsurf
MCP Server73 tools
bunqueue Engine
Workers / HTTP Handlers

What AI agents can do with bunqueue:

  • Schedule tasks — create cron jobs, delayed execution, recurring workflows with a single tool call
  • Manage pipelines — push jobs, monitor progress, retry failures, create sequential or parallel workflows
  • Auto-process jobs via HTTP — register an HTTP handler on a queue, and bunqueue calls the endpoint for every job. No external worker needed
  • Consume jobs directly — full pull/ack/fail cycle for inline job processing within the agent session
  • Monitor everything — server stats, memory, Prometheus metrics, DLQ entries, job logs, per-queue breakdowns
  • Control flow — pause/resume queues, set rate limits per second, manage concurrency limits
Terminal window
# One command to connect any MCP-compatible AI client
claude mcp add bunqueue -- bunx bunqueue-mcp

The MCP server runs as a subprocess, communicates via stdio using JSON-RPC, and supports both embedded mode (local SQLite) and TCP mode (connect to a remote bunqueue server). See the MCP Server guide for full setup, all 73 tools, HTTP handler configuration, and real-world examples.


TRIGGERS
API Handlers
Cron Scheduler
Webhooks Receiver
Event Triggers
QUEUE LAYER
emails queue
reports queue
webhooks queue
payments queue
WORKER POOL
Worker x3
Worker x2
Worker x5
Worker x3
OUTPUTS
DLQ
SQLite
Events
QUEUED
ACTIVE processing
COMPLETED
FAILED
RETRYING
retry
max attempts
DLQ
QUEUED
ACTIVE processing
COMPLETED
FAILED
RETRYING
DLQ max attempts

Email is one of the most common background job use cases. Sending email synchronously in an API request is slow (SMTP connections, template rendering, attachment handling) and unreliable (provider outages, rate limits, network errors). A queue decouples the send operation from the request, ensuring emails are delivered reliably even if the provider is temporarily unavailable.

bunqueue handles this with automatic retries and exponential backoff. If SendGrid or your SMTP server returns a 5xx error, the job is retried up to N times with increasing delays. After all attempts are exhausted, the job moves to the dead letter queue (DLQ) where it can be inspected, retried manually, or auto-retried on a schedule. Progress tracking lets you monitor each step of the email pipeline in real time.

Request API / Cron
Queue emails
Worker render + send
SMTP / SendGrid
Success logged
Retry backoff
DLQ alert
RequestAPI / Cron
Queueemails
Workerrender + send
SMTP / SendGrid
Successlogged
Retrybackoff
DLQalert
import { Queue, Worker, shutdownManager } from 'bunqueue/client';
interface EmailJob {
to: string;
template: 'welcome' | 'reset-password' | 'invoice' | 'notification';
data: Record<string, unknown>;
attachments?: { name: string; url: string }[];
}
const emailQueue = new Queue<EmailJob>('emails', {
embedded: true,
defaultJobOptions: {
attempts: 5,
backoff: 2000, // Exponential backoff
removeOnComplete: true,
},
});
const emailWorker = new Worker<EmailJob>('emails', async (job) => {
const { to, template, data, attachments } = job.data;
await job.updateProgress(10, 'Loading template...');
const html = await renderTemplate(template, data);
await job.updateProgress(30, 'Preparing attachments...');
const files = attachments ? await downloadAttachments(attachments) : [];
await job.updateProgress(50, 'Sending email...');
const result = await sendEmail({ to, html, attachments: files });
await job.log(`Delivered to ${to} via ${result.provider}`);
await job.updateProgress(100, 'Delivered');
return {
messageId: result.messageId,
provider: result.provider,
deliveredAt: new Date().toISOString(),
};
}, { embedded: true, concurrency: 10 });
// Send welcome email
await emailQueue.add('welcome', {
to: 'user@example.com',
template: 'welcome',
data: { name: 'John', activationLink: 'https://...' },
});
// Send bulk newsletter
const subscribers = await getSubscribers();
await emailQueue.addBulk(
subscribers.map(sub => ({
name: 'newsletter',
data: {
to: sub.email,
template: 'notification',
data: { name: sub.name, content: newsletterContent },
},
}))
);

User Request trigger
Queue reports
Worker
Query DB 10%
Transform 40%
Gen File 70%
Upload S3 90%
Notify email
User RequestReport trigger
Queuereports
Worker
Query DB 10% Transform 40% Gen File 70% Upload S3 90%
NotifyEmail sent

Reports that query large datasets, transform data, generate files, and upload to storage can take minutes to complete. Running this synchronously in an API request would timeout. Instead, the user triggers the report and receives a notification when it’s ready.

bunqueue’s progress tracking is essential here: the worker reports each phase (data fetch, processing, file generation, upload) so the frontend can show a live progress bar. The 10-minute timeout prevents stuck jobs from blocking the queue, and the 2-retry policy handles transient database or storage failures.

interface ReportJob {
type: 'sales' | 'inventory' | 'analytics' | 'audit';
format: 'pdf' | 'xlsx' | 'csv';
filters: {
dateRange: { start: string; end: string };
departments?: string[];
regions?: string[];
};
requestedBy: string;
notifyEmail: string;
}
const reportQueue = new Queue<ReportJob>('reports', {
embedded: true,
defaultJobOptions: {
attempts: 2,
timeout: 600000, // 10 minutes max
},
});
const reportWorker = new Worker<ReportJob>('reports', async (job) => {
const { type, format, filters, requestedBy, notifyEmail } = job.data;
// Phase 1: Query data
await job.updateProgress(10, 'Fetching data...');
await job.log(`Report type: ${type}, format: ${format}`);
const data = await fetchReportData(type, filters);
// Phase 2: Process data
await job.updateProgress(40, `Processing ${data.length} records...`);
const processed = await processData(data, type);
// Phase 3: Generate file
await job.updateProgress(70, `Generating ${format.toUpperCase()}...`);
const file = await generateFile(processed, format);
// Phase 4: Upload to storage
await job.updateProgress(90, 'Uploading to storage...');
const url = await uploadToS3(file, `reports/${job.id}.${format}`);
// Phase 5: Notify user
await job.updateProgress(95, 'Sending notification...');
await sendEmail({
to: notifyEmail,
template: 'report-ready',
data: { reportUrl: url, type, format },
});
await job.updateProgress(100, 'Complete');
return {
url,
size: file.size,
records: data.length,
generatedAt: new Date().toISOString(),
};
}, { embedded: true, concurrency: 3 });
// Generate monthly sales report
await reportQueue.add('monthly-sales', {
type: 'sales',
format: 'xlsx',
filters: {
dateRange: { start: '2024-01-01', end: '2024-01-31' },
regions: ['US', 'EU'],
},
requestedBy: 'user-123',
notifyEmail: 'manager@company.com',
});

Event order, user
Queue webhooks
Worker HTTP POST
Partner endpoint
Success 200 OK
Retry 5xx/429, backoff 5s→10s→20s
DLQ 8 fails, auto-retry hourly
Eventorder, user
Queuewebhooks
WorkerHTTP POST
Success 200 OK
Retry 5xx, backoff
DLQ 8 fails

Webhooks are inherently unreliable. Partner endpoints go down, return 5xx errors, or hit rate limits. A robust webhook system must handle all of these cases gracefully. bunqueue provides exactly this: each webhook delivery is a job with configurable retry attempts, exponential backoff, and a dead letter queue for permanent failures.

The DLQ is configured to auto-retry failed webhooks hourly (up to 3 times) before giving up. This means transient outages are handled automatically without any manual intervention. After 7 days, expired DLQ entries are purged. The worker uses AbortSignal.timeout() to prevent hanging connections from blocking the queue.

interface WebhookJob {
endpoint: string;
event: string;
payload: Record<string, unknown>;
headers?: Record<string, string>;
signature?: string;
}
const webhookQueue = new Queue<WebhookJob>('webhooks', {
embedded: true,
defaultJobOptions: {
attempts: 8,
backoff: 5000, // Exponential: 5s, 10s, 20s, 40s...
},
});
// Configure DLQ for permanent failures
webhookQueue.setDlqConfig({
autoRetry: true,
autoRetryInterval: 3600000, // Retry DLQ every hour
maxAutoRetries: 3,
maxAge: 604800000, // Keep for 7 days
});
const webhookWorker = new Worker<WebhookJob>('webhooks', async (job) => {
const { endpoint, event, payload, headers, signature } = job.data;
await job.log(`Delivering ${event} to ${endpoint}`);
const response = await fetch(endpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Event': event,
'X-Webhook-Signature': signature || '',
'X-Webhook-Delivery': job.id,
...headers,
},
body: JSON.stringify(payload),
signal: AbortSignal.timeout(30000), // 30s timeout
});
if (!response.ok) {
const body = await response.text();
throw new Error(`HTTP ${response.status}: ${body.slice(0, 200)}`);
}
return {
status: response.status,
deliveredAt: new Date().toISOString(),
attempt: job.attemptsMade,
};
}, { embedded: true, concurrency: 20 });
// Dispatch webhook
await webhookQueue.add('order.created', {
endpoint: 'https://partner.com/webhooks',
event: 'order.created',
payload: {
orderId: 'ORD-123',
total: 99.99,
items: [{ sku: 'ITEM-1', qty: 2 }],
},
signature: generateHmacSignature(payload, secret),
});

Upload S3
Queue images
Worker
thumb 150px
card 400px
full 1200px
og 1200px
Upload CDN
UploadS3 origin
Queueimages
Worker
thumb 150px card 400px full 1200px og 1200px
Upload CDNAll variants

When a user uploads an image, you typically need to generate multiple variants: thumbnails for listings, medium sizes for cards, full resolution for detail views, and OpenGraph images for social sharing. Processing all of these synchronously would make the upload endpoint unbearably slow.

By pushing an image job to bunqueue, the upload returns immediately while a Worker handles the heavy lifting in the background. The worker downloads the source image, runs it through Sharp (or any image processing library) for each variant, uploads the results to a CDN, and saves the URLs as the job result. Progress tracking reports each variant as it completes, and the 2-minute timeout prevents stuck ffmpeg/sharp processes from blocking the queue.

interface ImageJob {
sourceUrl: string;
variants: Array<{
name: string;
width: number;
height: number;
format: 'webp' | 'avif' | 'jpeg';
quality: number;
}>;
destination: string;
metadata?: Record<string, string>;
}
const imageQueue = new Queue<ImageJob>('images', {
embedded: true,
defaultJobOptions: {
attempts: 3,
timeout: 120000, // 2 minutes
},
});
const imageWorker = new Worker<ImageJob>('images', async (job) => {
const { sourceUrl, variants, destination, metadata } = job.data;
const results: Record<string, string> = {};
await job.updateProgress(5, 'Downloading source image...');
const source = await downloadImage(sourceUrl);
const totalVariants = variants.length;
for (let i = 0; i < totalVariants; i++) {
const variant = variants[i];
const progress = 10 + Math.floor((i / totalVariants) * 80);
await job.updateProgress(progress, `Processing ${variant.name}...`);
// Resize and convert
const processed = await sharp(source)
.resize(variant.width, variant.height, { fit: 'cover' })
.toFormat(variant.format, { quality: variant.quality })
.toBuffer();
// Upload to CDN
const path = `${destination}/${variant.name}.${variant.format}`;
const url = await uploadToCDN(processed, path, metadata);
results[variant.name] = url;
await job.log(`Uploaded ${variant.name}: ${url}`);
}
await job.updateProgress(100, 'Complete');
return {
variants: results,
processedAt: new Date().toISOString(),
};
}, { embedded: true, concurrency: 5 });
// Process product image
await imageQueue.add('product-image', {
sourceUrl: 'https://uploads.example.com/raw/product-123.jpg',
variants: [
{ name: 'thumbnail', width: 150, height: 150, format: 'webp', quality: 80 },
{ name: 'card', width: 400, height: 300, format: 'webp', quality: 85 },
{ name: 'full', width: 1200, height: 900, format: 'webp', quality: 90 },
{ name: 'og', width: 1200, height: 630, format: 'jpeg', quality: 85 },
],
destination: 'products/123',
metadata: { 'Cache-Control': 'public, max-age=31536000' },
});

Exporting large datasets (hundreds of thousands of records) cannot happen in a single request. The data must be fetched in chunks, written to a temporary file, uploaded to storage, and then the user must be notified. This is a textbook background job.

bunqueue’s progress tracking shines here: each chunk updates the progress percentage, so the user sees real-time feedback on a potentially 30-minute export. The chunked approach prevents memory exhaustion — instead of loading all records into memory, the worker processes 10,000 records at a time. The 30-minute timeout accommodates large exports while preventing infinite hangs.

interface ExportJob {
type: 'users' | 'orders' | 'products' | 'transactions';
format: 'csv' | 'json' | 'parquet';
filters?: Record<string, unknown>;
notifyEmail: string;
chunkSize?: number;
}
const exportQueue = new Queue<ExportJob>('exports', {
embedded: true,
defaultJobOptions: {
attempts: 2,
timeout: 1800000, // 30 minutes
},
});
const exportWorker = new Worker<ExportJob>('exports', async (job) => {
const { type, format, filters, notifyEmail, chunkSize = 10000 } = job.data;
await job.updateProgress(5, 'Counting records...');
const totalCount = await countRecords(type, filters);
const totalChunks = Math.ceil(totalCount / chunkSize);
await job.log(`Exporting ${totalCount} ${type} in ${totalChunks} chunks`);
// Create temporary file
const tempFile = await createTempFile(`export-${job.id}.${format}`);
const writer = await createWriter(tempFile, format);
// Process in chunks
for (let chunk = 0; chunk < totalChunks; chunk++) {
const progress = 10 + Math.floor((chunk / totalChunks) * 80);
await job.updateProgress(progress, `Processing chunk ${chunk + 1}/${totalChunks}...`);
const records = await fetchRecords(type, filters, {
offset: chunk * chunkSize,
limit: chunkSize,
});
await writer.write(records);
}
await writer.close();
// Upload to storage
await job.updateProgress(95, 'Uploading file...');
const url = await uploadToS3(tempFile, `exports/${job.id}.${format}`);
// Send notification
await sendEmail({
to: notifyEmail,
template: 'export-ready',
data: { downloadUrl: url, type, recordCount: totalCount },
});
await job.updateProgress(100, 'Complete');
return {
url,
recordCount: totalCount,
format,
size: await getFileSize(tempFile),
};
}, { embedded: true, concurrency: 2 });
// Export all orders for accounting
await exportQueue.add('accounting-export', {
type: 'orders',
format: 'csv',
filters: {
status: 'completed',
dateRange: { start: '2024-01-01', end: '2024-12-31' },
},
notifyEmail: 'accounting@company.com',
chunkSize: 5000,
});

Event security, order
Queue notifications
Worker
Load Prefs Fan Out
Email
Push FCM
SMS Twilio
In-App
Eventsecurity, order
Queuenotifications
Worker Load Prefs → Fan Out
Email
Push
SMS
In-App

Modern applications need to reach users through multiple channels: email, push notifications, SMS, and in-app messages. Each channel has its own API, failure modes, and rate limits. Processing notifications synchronously would make the triggering action (e.g. “order placed”) extremely slow and fragile — a single SMS provider timeout would block the entire response.

By queuing notification jobs, the triggering action returns instantly while the worker fans out to all configured channels. User preferences are respected (some users opt out of SMS), and individual channel failures don’t block other channels. High-priority notifications (like security alerts) use bunqueue’s priority system to jump ahead of promotional messages in the queue.

interface NotificationJob {
userId: string;
type: 'order_update' | 'promotion' | 'security_alert' | 'reminder';
title: string;
body: string;
data?: Record<string, unknown>;
channels?: ('email' | 'push' | 'sms' | 'in_app')[];
}
const notificationQueue = new Queue<NotificationJob>('notifications', {
embedded: true,
defaultJobOptions: {
attempts: 3,
backoff: 1000,
},
});
const notificationWorker = new Worker<NotificationJob>('notifications', async (job) => {
const { userId, type, title, body, data, channels } = job.data;
// Get user preferences
const user = await getUser(userId);
const prefs = await getNotificationPreferences(userId);
// Determine channels to use
const activeChannels = channels || getDefaultChannels(type);
const allowedChannels = activeChannels.filter(ch => prefs[ch] !== false);
const results: Record<string, boolean> = {};
for (const channel of allowedChannels) {
try {
switch (channel) {
case 'email':
await sendEmail({ to: user.email, subject: title, body });
break;
case 'push':
await sendPushNotification(user.deviceTokens, { title, body, data });
break;
case 'sms':
await sendSMS(user.phone, `${title}: ${body}`);
break;
case 'in_app':
await createInAppNotification(userId, { type, title, body, data });
break;
}
results[channel] = true;
await job.log(`Sent via ${channel}`);
} catch (err) {
results[channel] = false;
await job.log(`Failed ${channel}: ${err.message}`);
}
}
return {
userId,
type,
channels: results,
sentAt: new Date().toISOString(),
};
}, { embedded: true, concurrency: 15 });
// Security alert - high priority, all channels
await notificationQueue.add('security-alert', {
userId: 'user-123',
type: 'security_alert',
title: 'New login detected',
body: 'A new device logged into your account from San Francisco, CA',
data: { ip: '192.168.1.1', device: 'Chrome on MacOS' },
channels: ['email', 'push', 'sms'],
}, { priority: 100 });

Order Checkout
Queue payments
Worker
Idempotency Check
yes
Return Cached already done
no
Stripe Charge
Record Txn Audit Log
DLQ: Manual review required (no auto-retry)
Order Checkout
Queuepayments
WorkerPayment processor
?
Idempotency Check
yes
Return Cached
no
Stripe Charge
Record Txn

Payment processing is the most critical background job in any e-commerce system. It must be reliable (never lose a charge), idempotent (never charge twice for the same order), and auditable (every action logged). Processing payments synchronously in the checkout flow is risky: a timeout or crash after the charge but before the database update results in a lost transaction.

bunqueue handles this with the durable: true flag, which bypasses the write buffer and persists the job to SQLite immediately. This guarantees zero data loss even if the process crashes. The idempotency key pattern prevents duplicate charges, and the DLQ is configured with autoRetry: false because failed payments require manual review — you don’t want to auto-retry a charge that failed due to insufficient funds.

interface PaymentJob {
orderId: string;
amount: number;
currency: string;
customerId: string;
paymentMethodId: string;
metadata?: Record<string, string>;
idempotencyKey: string;
}
const paymentQueue = new Queue<PaymentJob>('payments', {
embedded: true,
defaultJobOptions: {
attempts: 3,
backoff: 5000,
timeout: 60000,
},
});
// Configure strict DLQ for failed payments
paymentQueue.setDlqConfig({
autoRetry: false, // Manual review required
maxAge: 2592000000, // Keep for 30 days
});
const paymentWorker = new Worker<PaymentJob>('payments', async (job) => {
const { orderId, amount, currency, customerId, paymentMethodId, idempotencyKey } = job.data;
await job.log(`Processing payment for order ${orderId}`);
// Check idempotency
const existing = await checkIdempotency(idempotencyKey);
if (existing) {
await job.log('Payment already processed (idempotent)');
return existing;
}
// Create payment intent
await job.updateProgress(20, 'Creating payment intent...');
const intent = await stripe.paymentIntents.create({
amount: Math.round(amount * 100),
currency,
customer: customerId,
payment_method: paymentMethodId,
confirm: true,
idempotency_key: idempotencyKey,
});
if (intent.status !== 'succeeded') {
throw new Error(`Payment failed: ${intent.status}`);
}
// Record transaction
await job.updateProgress(80, 'Recording transaction...');
await recordTransaction({
orderId,
paymentIntentId: intent.id,
amount,
currency,
status: 'completed',
});
// Audit log
await createAuditLog({
action: 'payment.completed',
orderId,
amount,
paymentIntentId: intent.id,
jobId: job.id,
});
await job.updateProgress(100, 'Complete');
return {
paymentIntentId: intent.id,
status: 'completed',
processedAt: new Date().toISOString(),
};
}, { embedded: true, concurrency: 5 });
// Process order payment - CRITICAL: use durable for guaranteed persistence
await paymentQueue.add('charge', {
orderId: 'ORD-123',
amount: 99.99,
currency: 'usd',
customerId: 'cus_xxx',
paymentMethodId: 'pm_xxx',
idempotencyKey: `order-${orderId}-payment`,
}, { durable: true }); // Immediate disk write - no data loss on crash

Search engines like Elasticsearch or Typesense need to stay in sync with your database. Updating the index synchronously on every database write adds latency and creates a tight coupling between your application and the search service. If Elasticsearch is down, your writes would fail.

By pushing index operations to a queue, database writes return immediately while bunqueue handles the synchronization in the background. The high retry count (5 attempts) and removeOnComplete: true flag keep the queue lean while ensuring eventual consistency. Bulk re-indexing after schema changes is straightforward: push thousands of index jobs and let the worker process them at controlled concurrency.

interface IndexJob {
action: 'index' | 'update' | 'delete';
entity: 'product' | 'user' | 'article' | 'order';
id: string;
data?: Record<string, unknown>;
}
const indexQueue = new Queue<IndexJob>('search-index', {
embedded: true,
defaultJobOptions: {
attempts: 5,
backoff: 2000,
removeOnComplete: true,
},
});
const indexWorker = new Worker<IndexJob>('search-index', async (job) => {
const { action, entity, id, data } = job.data;
const indexName = `${entity}s`; // products, users, etc.
switch (action) {
case 'index':
case 'update':
const document = data || await fetchEntity(entity, id);
const transformed = transformForSearch(entity, document);
await elasticsearch.index({
index: indexName,
id,
body: transformed,
});
await job.log(`Indexed ${entity}:${id}`);
break;
case 'delete':
await elasticsearch.delete({
index: indexName,
id,
ignore: [404],
});
await job.log(`Deleted ${entity}:${id} from index`);
break;
}
return { action, entity, id };
}, { embedded: true, concurrency: 20 });
// Index new product
await indexQueue.add('index-product', {
action: 'index',
entity: 'product',
id: 'prod-123',
data: {
name: 'Premium Widget',
description: 'High-quality widget for professionals',
price: 49.99,
categories: ['tools', 'professional'],
tags: ['premium', 'bestseller'],
},
});
// Bulk re-index after schema change
const productIds = await getAllProductIds();
await indexQueue.addBulk(
productIds.map(id => ({
name: 'reindex',
data: { action: 'index', entity: 'product', id },
}))
);

Recurring tasks using cron expressions (server mode) or intervals (embedded mode).

// Server mode: use upsertJobScheduler
await scheduledQueue.upsertJobScheduler('daily-cleanup', {
pattern: '0 3 * * *', // Every day at 3:00 AM
data: { task: 'cleanup', params: { olderThanDays: 30 } },
});
await scheduledQueue.upsertJobScheduler('health-check', {
every: 300000, // Every 5 minutes
data: { task: 'health-check' },
});
import { Queue, Worker } from 'bunqueue/client';
interface ScheduledJob {
task: string;
params?: Record<string, unknown>;
}
const scheduledQueue = new Queue<ScheduledJob>('scheduled', { embedded: true });
// Recurring job using setInterval
const healthCheckInterval = setInterval(async () => {
await scheduledQueue.add('health-check', { task: 'health-check' });
}, 300000); // Every 5 minutes
// Daily cleanup (use node-cron or similar for cron expressions)
import cron from 'node-cron';
cron.schedule('0 3 * * *', async () => {
await scheduledQueue.add('cleanup', {
task: 'cleanup',
params: { olderThanDays: 30 },
});
});
const scheduledWorker = new Worker<ScheduledJob>('scheduled', async (job) => {
const { task, params } = job.data;
switch (task) {
case 'cleanup':
const deleted = await cleanupOldRecords(params?.olderThanDays);
return { deleted };
case 'health-check':
const status = await checkSystemHealth();
if (!status.healthy) {
await alertOps(status);
}
return status;
}
}, { embedded: true });
// Clean up on shutdown
process.on('SIGINT', () => {
clearInterval(healthCheckInterval);
scheduledWorker.close();
});

TENANT A isolated
QueueGroup('tenant-a')
emails
reports
webhooks
  • Isolated namespace
  • Independent pause/resume
  • Separate DLQ
  • Own rate limits
TENANT B isolated
QueueGroup('tenant-b')
emails
reports
webhooks
  • Isolated namespace
  • Independent pause/resume
  • Separate DLQ
  • Own rate limits
SHARED INFRASTRUCTURE SQLite (single DB, prefixed keys)
TENANT A isolated
QueueGroup('tenant-a')
emails
reports
webhooks
SHARED INFRASTRUCTURE SQLite (prefixed keys)
TENANT B isolated
QueueGroup('tenant-b')
emails
reports
webhooks

In a multi-tenant SaaS application, you need queue isolation between tenants. One tenant’s burst of activity should not impact another tenant’s job processing. Pausing queues for maintenance on tenant A should not affect tenant B.

bunqueue’s QueueGroup provides namespace isolation: each tenant gets its own set of queues (emails, reports, webhooks) prefixed with the tenant ID. All queues share the same SQLite database (efficient storage), but operations like pause, resume, and drain are scoped to the tenant. Rate limits and concurrency can be configured per-tenant to enforce resource boundaries.

import { QueueGroup } from 'bunqueue/client';
interface TenantJob {
action: string;
data: Record<string, unknown>;
}
// Create isolated queue groups per tenant
function createTenantQueues(tenantId: string) {
const group = new QueueGroup(tenantId);
return {
emails: group.getQueue<TenantJob>('emails', {
embedded: true,
defaultJobOptions: { attempts: 3 },
}),
reports: group.getQueue<TenantJob>('reports', {
embedded: true,
defaultJobOptions: { timeout: 300000 },
}),
webhooks: group.getQueue<TenantJob>('webhooks', {
embedded: true,
defaultJobOptions: { attempts: 5 },
}),
group,
};
}
// Tenant A
const tenantA = createTenantQueues('tenant-a');
await tenantA.emails.add('send', { action: 'welcome', data: { userId: '123' } });
// Tenant B (completely isolated)
const tenantB = createTenantQueues('tenant-b');
await tenantB.emails.add('send', { action: 'welcome', data: { userId: '456' } });
// Pause all queues for a tenant (maintenance)
tenantA.group.pauseAll();
// Resume
tenantA.group.resumeAll();
// List tenant's queues
console.log(tenantA.group.listQueues()); // ['emails', 'reports', 'webhooks']

Server ModesetRateLimit(100, 60000)Token bucket algorithm
Embedded Modeconcurrency + backoffParallel limit + retry delay

External APIs impose rate limits, and exceeding them results in 429 errors, temporary bans, or degraded service. When your application needs to make thousands of API calls (syncing inventory, importing data, sending notifications), you need controlled throughput.

bunqueue provides two levels of rate control. In embedded mode, worker concurrency acts as a parallel limit — setting concurrency: 10 means at most 10 API calls happen simultaneously. The worker also handles 429 responses by throwing an error, which triggers bunqueue’s automatic retry with backoff. In server mode, setRateLimit() provides precise per-second rate limiting using a token bucket algorithm.

interface ApiJob {
endpoint: string;
method: 'GET' | 'POST' | 'PUT' | 'DELETE';
body?: Record<string, unknown>;
headers?: Record<string, string>;
}
const apiQueue = new Queue<ApiJob>('external-api', {
embedded: true,
defaultJobOptions: {
attempts: 5,
backoff: 10000, // 10 second base backoff for retries
},
});
// Control throughput via worker concurrency
const apiWorker = new Worker<ApiJob>('external-api', async (job) => {
const { endpoint, method, body, headers } = job.data;
const response = await fetch(endpoint, {
method,
headers: {
'Content-Type': 'application/json',
...headers,
},
body: body ? JSON.stringify(body) : undefined,
});
// Handle rate limiting from external API
if (response.status === 429) {
const retryAfter = parseInt(response.headers.get('Retry-After') || '60');
throw new Error(`Rate limited. Retry after ${retryAfter}s`);
}
if (!response.ok) {
throw new Error(`API error: ${response.status}`);
}
return await response.json();
}, {
embedded: true,
concurrency: 10, // Max 10 parallel requests (controls throughput)
});
// Queue many API calls
for (const item of items) {
await apiQueue.add('sync', {
endpoint: 'https://api.external.com/items',
method: 'POST',
body: item,
headers: { 'Authorization': `Bearer ${token}` },
});
}

Parent-Child (Fan-in)
ORDER FULFILLMENT Parent Job
Validate Inventory Child 1
Validate Payment Child 2
Calculate Shipping Child 3
Parent Executes all children done
Order FulfillmentParent Job
Child 1: Validate Inventory
Child 2: Validate Payment
Child 3: Calculate Shipping
Parent Executesall children done
Sequential Chain
Extract CSV
result
Transform clean
result
Load warehouse
ExtractCSV
result
Transformclean
result
Loadwarehouse
Fan-out / Fan-in (Parallel Merge)
Chunk 1
Chunk 2
Chunk 3
Merge Results
Chunk 1
Chunk 2
Chunk 3
Merge Results

Real-world job processing often involves dependencies: you cannot ship an order before validating inventory and payment. You cannot load data into a warehouse before extracting and transforming it. You cannot merge results before all parallel chunks complete.

bunqueue’s FlowProducer (BullMQ v5 compatible) supports three workflow patterns. Parent-child flows run children in parallel, then execute the parent when all children complete. Sequential chains pass results from one job to the next (A → B → C). Fan-out/fan-in runs parallel jobs and merges results into a final job. Workers can access parent and child results to build data pipelines without external orchestration.

import { FlowProducer, Queue, Worker } from 'bunqueue/client';
const flow = new FlowProducer({ embedded: true });
// Order fulfillment workflow
const orderFlow = await flow.add({
name: 'fulfill-order',
queueName: 'orders',
data: { orderId: 'ORD-123' },
children: [
// These run in parallel first
{
name: 'validate-inventory',
queueName: 'inventory',
data: { orderId: 'ORD-123' },
},
{
name: 'validate-payment',
queueName: 'payments',
data: { orderId: 'ORD-123' },
},
{
name: 'calculate-shipping',
queueName: 'shipping',
data: { orderId: 'ORD-123' },
},
],
});
// Sequential chain: A → B → C
const pipeline = await flow.addChain([
{ name: 'extract', queueName: 'etl', data: { source: 's3://bucket/data.csv' } },
{ name: 'transform', queueName: 'etl', data: {} }, // Gets result from extract
{ name: 'load', queueName: 'etl', data: { target: 'warehouse' } },
]);
// Parallel batch then merge
const batchFlow = await flow.addBulkThen(
// These run in parallel
[
{ name: 'process-chunk-1', queueName: 'processing', data: { chunk: 1 } },
{ name: 'process-chunk-2', queueName: 'processing', data: { chunk: 2 } },
{ name: 'process-chunk-3', queueName: 'processing', data: { chunk: 3 } },
],
// This runs after all chunks complete
{ name: 'merge-results', queueName: 'processing', data: {} }
);
// Workers can access parent results
const mergeWorker = new Worker('processing', async (job) => {
if (job.name === 'merge-results') {
const childResults = await flow.getParentResults(job);
// childResults = [{ chunk: 1, result: ... }, { chunk: 2, result: ... }, ...]
return combineResults(childResults);
}
// Process individual chunk
return processChunk(job.data.chunk);
}, { embedded: true });

Video transcoding is one of the most resource-intensive background operations. A single video may need to be encoded into 4+ resolutions (1080p, 720p, 480p, 360p), generate thumbnails, and upload all variants to a CDN. This can take minutes to hours depending on the source file size and target formats.

bunqueue’s long timeout support (up to 1 hour), per-resolution progress tracking, and job logging make it ideal for transcoding pipelines. The worker reports progress as each resolution completes, and job logs capture encoding details (resolution, bitrate, output URL) for debugging. An optional webhook URL notifies your application when transcoding finishes.

interface TranscodeJob {
sourceUrl: string;
outputPath: string;
resolutions: Array<{
name: string;
width: number;
height: number;
bitrate: string;
}>;
format: 'mp4' | 'webm' | 'hls';
webhookUrl?: string;
}
const transcodeQueue = new Queue<TranscodeJob>('transcode', {
embedded: true,
defaultJobOptions: {
attempts: 2,
timeout: 3600000, // 1 hour
},
});
const transcodeWorker = new Worker<TranscodeJob>('transcode', async (job) => {
const { sourceUrl, outputPath, resolutions, format, webhookUrl } = job.data;
const results: Record<string, string> = {};
await job.updateProgress(5, 'Downloading source video...');
const sourcePath = await downloadVideo(sourceUrl);
const totalResolutions = resolutions.length;
for (let i = 0; i < totalResolutions; i++) {
const res = resolutions[i];
const progress = 10 + Math.floor((i / totalResolutions) * 85);
await job.updateProgress(progress, `Transcoding ${res.name}...`);
await job.log(`Starting ${res.name}: ${res.width}x${res.height} @ ${res.bitrate}`);
const output = await ffmpeg(sourcePath)
.size(`${res.width}x${res.height}`)
.videoBitrate(res.bitrate)
.format(format)
.output(`${outputPath}/${res.name}.${format}`)
.run();
const url = await uploadToCDN(output, `${outputPath}/${res.name}.${format}`);
results[res.name] = url;
await job.log(`Completed ${res.name}: ${url}`);
}
// Generate thumbnail
await job.updateProgress(98, 'Generating thumbnail...');
const thumbnail = await generateThumbnail(sourcePath);
results.thumbnail = await uploadToCDN(thumbnail, `${outputPath}/thumb.jpg`);
// Webhook notification
if (webhookUrl) {
await fetch(webhookUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ jobId: job.id, status: 'completed', results }),
});
}
await job.updateProgress(100, 'Complete');
return {
outputs: results,
completedAt: new Date().toISOString(),
};
}, { embedded: true, concurrency: 2 });
// Transcode uploaded video
await transcodeQueue.add('transcode-video', {
sourceUrl: 'https://uploads.example.com/raw/video-123.mov',
outputPath: 'videos/123',
resolutions: [
{ name: '1080p', width: 1920, height: 1080, bitrate: '5000k' },
{ name: '720p', width: 1280, height: 720, bitrate: '2500k' },
{ name: '480p', width: 854, height: 480, bitrate: '1000k' },
{ name: '360p', width: 640, height: 360, bitrate: '500k' },
],
format: 'mp4',
webhookUrl: 'https://api.example.com/webhooks/transcode',
});

In production, processes receive shutdown signals (SIGINT, SIGTERM) from container orchestrators, deployment scripts, or manual restarts. A naive shutdown kills workers immediately, leaving jobs in an active state with no acknowledgment. These orphaned jobs become “stalled” and must be recovered on restart.

bunqueue’s graceful shutdown pattern pauses all workers (stop accepting new jobs), waits for active jobs to complete (with a 30-second timeout), closes queue connections, and flushes the SQLite write buffer. This ensures no data loss and no orphaned jobs. If active jobs don’t complete within the timeout, worker.close(true) forces an immediate stop — the stall detector will recover these jobs on restart.

import { Queue, Worker, shutdownManager } from 'bunqueue/client';
// Initialize all queues and workers
const queues = {
emails: new Queue('emails', { embedded: true }),
reports: new Queue('reports', { embedded: true }),
webhooks: new Queue('webhooks', { embedded: true }),
};
const workers = {
emails: new Worker('emails', emailProcessor, { embedded: true, concurrency: 10 }),
reports: new Worker('reports', reportProcessor, { embedded: true, concurrency: 3 }),
webhooks: new Worker('webhooks', webhookProcessor, { embedded: true, concurrency: 20 }),
};
// Graceful shutdown handler
async function gracefulShutdown(signal: string) {
console.log(`\nReceived ${signal}. Starting graceful shutdown...`);
// 1. Stop accepting new jobs
console.log('Pausing all workers...');
for (const worker of Object.values(workers)) {
worker.pause();
}
// 2. Wait for active jobs to complete (with timeout)
console.log('Waiting for active jobs to complete...');
const shutdownTimeout = 30000; // 30 seconds
try {
await Promise.race([
Promise.all(Object.values(workers).map(w => w.close())),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Shutdown timeout')), shutdownTimeout)
),
]);
console.log('All workers closed gracefully');
} catch (err) {
console.log('Forcing worker shutdown...');
await Promise.all(Object.values(workers).map(w => w.close(true)));
}
// 3. Close queue connections
console.log('Closing queue connections...');
await Promise.all(Object.values(queues).map(q => q.close()));
// 4. Shutdown the embedded manager (flushes SQLite)
shutdownManager();
console.log('Shutdown complete');
process.exit(0);
}
// Register signal handlers
process.on('SIGINT', () => gracefulShutdown('SIGINT'));
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
// Handle uncaught errors
process.on('uncaughtException', async (err) => {
console.error('Uncaught exception:', err);
await gracefulShutdown('uncaughtException');
});
process.on('unhandledRejection', async (reason) => {
console.error('Unhandled rejection:', reason);
await gracefulShutdown('unhandledRejection');
});