Use Cases
Production-ready patterns for common background job scenarios.
Architecture Overview
┌─────────────────────────────────────────────────────────────────────────┐│ YOUR APPLICATION │├─────────────────────────────────────────────────────────────────────────┤│ ││ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ││ │ API │ │ Cron │ │ Webhooks │ │ Events │ ││ │ Handlers │ │Scheduler │ │ Receiver │ │ Triggers │ ││ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ ││ │ │ │ │ ││ └────────────────┴────────────────┴────────────────┘ ││ │ ││ ▼ ││ ┌─────────────────────────────────────────────────────────────────┐ ││ │ QUEUE LAYER │ ││ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ ││ │ │ emails │ │ reports │ │webhooks │ │ payments│ ... │ ││ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ ││ └─────────────────────────────────────────────────────────────────┘ ││ │ ││ ▼ ││ ┌─────────────────────────────────────────────────────────────────┐ ││ │ WORKER POOL │ ││ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ ││ │ │Worker x3│ │Worker x2│ │Worker x5│ │Worker x3│ │ ││ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ ││ └─────────────────────────────────────────────────────────────────┘ ││ │ ││ ┌──────────────────────────┼──────────────────────────┐ ││ ▼ ▼ ▼ ││ ┌─────────┐ ┌───────────┐ ┌──────────┐ ││ │ DLQ │ │ SQLite │ │ Events │ ││ │ (Failed)│ │(Persistence) │ (Hooks) │ ││ └─────────┘ └───────────┘ └──────────┘ ││ │└─────────────────────────────────────────────────────────────────────────┘Job Lifecycle
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐│ QUEUED │───▶│ ACTIVE │───▶│COMPLETED │ │ FAILED │└──────────┘ └────┬─────┘ └──────────┘ └────┬─────┘ │ │ │ ┌──────────┐ │ └────────▶│ RETRYING │──────────┘ └────┬─────┘ │ max attempts ▼ ┌──────────┐ │ DLQ │ └──────────┘Email Delivery System
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐│ Request │────▶│ Queue │────▶│ Worker │────▶│ SMTP/ ││ (API/Cron) │ │ (emails) │ │ (render + │ │ SendGrid │└─────────────┘ └─────────────┘ │ send) │ └─────────────┘ └──────┬──────┘ │ ┌──────────────────────────┼──────────────────────────┐ ▼ ▼ ▼ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ Success │ │ Retry │ │ DLQ │ │ (logged) │ │ (backoff) │ │ (alert) │ └───────────┘ └───────────┘ └───────────┘Reliable email sending with retries, templates, and delivery tracking.
import { Queue, Worker, shutdownManager } from 'bunqueue/client';
interface EmailJob { to: string; template: 'welcome' | 'reset-password' | 'invoice' | 'notification'; data: Record<string, unknown>; attachments?: { name: string; url: string }[];}
const emailQueue = new Queue<EmailJob>('emails', { embedded: true, defaultJobOptions: { attempts: 5, backoff: 2000, // Exponential backoff removeOnComplete: true, },});
const emailWorker = new Worker<EmailJob>('emails', async (job) => { const { to, template, data, attachments } = job.data;
await job.updateProgress(10, 'Loading template...'); const html = await renderTemplate(template, data);
await job.updateProgress(30, 'Preparing attachments...'); const files = attachments ? await downloadAttachments(attachments) : [];
await job.updateProgress(50, 'Sending email...'); const result = await sendEmail({ to, html, attachments: files });
await job.log(`Delivered to ${to} via ${result.provider}`); await job.updateProgress(100, 'Delivered');
return { messageId: result.messageId, provider: result.provider, deliveredAt: new Date().toISOString(), };}, { embedded: true, concurrency: 10 });
// Send welcome emailawait emailQueue.add('welcome', { to: 'user@example.com', template: 'welcome', data: { name: 'John', activationLink: 'https://...' },});
// Send bulk newsletterconst subscribers = await getSubscribers();await emailQueue.addBulk( subscribers.map(sub => ({ name: 'newsletter', data: { to: sub.email, template: 'notification', data: { name: sub.name, content: newsletterContent }, }, })));Report Generation Pipeline
┌──────────┐ ┌──────────┐ ┌─────────────────────────────────────┐│ User │───▶│ Queue │───▶│ Worker ││ Request │ │ (reports)│ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │└──────────┘ └──────────┘ │ │Query│▶│Trans│▶│ Gen │▶│Upload│ │ │ │ DB │ │form │ │File │ │ S3 │ │ │ └─────┘ └─────┘ └─────┘ └─────┘ │ │ 10% 40% 70% 90% │ └────────────────┬────────────────────┘ │ ▼ ┌────────────┐ │ Notify │ │ (email) │ └────────────┘Long-running report generation with progress tracking and file storage.
interface ReportJob { type: 'sales' | 'inventory' | 'analytics' | 'audit'; format: 'pdf' | 'xlsx' | 'csv'; filters: { dateRange: { start: string; end: string }; departments?: string[]; regions?: string[]; }; requestedBy: string; notifyEmail: string;}
const reportQueue = new Queue<ReportJob>('reports', { embedded: true, defaultJobOptions: { attempts: 2, timeout: 600000, // 10 minutes max },});
const reportWorker = new Worker<ReportJob>('reports', async (job) => { const { type, format, filters, requestedBy, notifyEmail } = job.data;
// Phase 1: Query data await job.updateProgress(10, 'Fetching data...'); await job.log(`Report type: ${type}, format: ${format}`); const data = await fetchReportData(type, filters);
// Phase 2: Process data await job.updateProgress(40, `Processing ${data.length} records...`); const processed = await processData(data, type);
// Phase 3: Generate file await job.updateProgress(70, `Generating ${format.toUpperCase()}...`); const file = await generateFile(processed, format);
// Phase 4: Upload to storage await job.updateProgress(90, 'Uploading to storage...'); const url = await uploadToS3(file, `reports/${job.id}.${format}`);
// Phase 5: Notify user await job.updateProgress(95, 'Sending notification...'); await sendEmail({ to: notifyEmail, template: 'report-ready', data: { reportUrl: url, type, format }, });
await job.updateProgress(100, 'Complete');
return { url, size: file.size, records: data.length, generatedAt: new Date().toISOString(), };}, { embedded: true, concurrency: 3 });
// Generate monthly sales reportawait reportQueue.add('monthly-sales', { type: 'sales', format: 'xlsx', filters: { dateRange: { start: '2024-01-01', end: '2024-01-31' }, regions: ['US', 'EU'], }, requestedBy: 'user-123', notifyEmail: 'manager@company.com',});Webhook Delivery with Circuit Breaker
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────┐│ Event │───▶│ Queue │───▶│ Worker │───▶│ Partner ││ (order, │ │(webhooks)│ │ (HTTP │ │ Endpoint ││ user) │ │ │ │ POST) │ │ │└──────────┘ └──────────┘ └────┬─────┘ └──────────────┘ │ ┌─────────────────────────┼─────────────────────────┐ │ │ │ ▼ ▼ ▼ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ Success │ │ Retry │ │ DLQ │ │ (200) │ │ (5xx/429) │ │ (8 fails) │ └────────────┘ │ exp backoff│ │ auto-retry │ │ 5s▶10s▶20s │ │ hourly │ └────────────┘ └────────────┘Reliable webhook delivery with automatic retries and failure tracking.
interface WebhookJob { endpoint: string; event: string; payload: Record<string, unknown>; headers?: Record<string, string>; signature?: string;}
const webhookQueue = new Queue<WebhookJob>('webhooks', { embedded: true, defaultJobOptions: { attempts: 8, backoff: 5000, // Exponential: 5s, 10s, 20s, 40s... },});
// Configure DLQ for permanent failureswebhookQueue.setDlqConfig({ autoRetry: true, autoRetryInterval: 3600000, // Retry DLQ every hour maxAutoRetries: 3, maxAge: 604800000, // Keep for 7 days});
const webhookWorker = new Worker<WebhookJob>('webhooks', async (job) => { const { endpoint, event, payload, headers, signature } = job.data;
await job.log(`Delivering ${event} to ${endpoint}`);
const response = await fetch(endpoint, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-Webhook-Event': event, 'X-Webhook-Signature': signature || '', 'X-Webhook-Delivery': job.id, ...headers, }, body: JSON.stringify(payload), signal: AbortSignal.timeout(30000), // 30s timeout });
if (!response.ok) { const body = await response.text(); throw new Error(`HTTP ${response.status}: ${body.slice(0, 200)}`); }
return { status: response.status, deliveredAt: new Date().toISOString(), attempt: job.attemptsMade, };}, { embedded: true, concurrency: 20 });
// Dispatch webhookawait webhookQueue.add('order.created', { endpoint: 'https://partner.com/webhooks', event: 'order.created', payload: { orderId: 'ORD-123', total: 99.99, items: [{ sku: 'ITEM-1', qty: 2 }], }, signature: generateHmacSignature(payload, secret),});Image Processing Pipeline
┌──────────┐ ┌──────────┐ ┌─────────────────────────────────────────┐│ Upload │───▶│ Queue │───▶│ Worker ││ (S3) │ │ (images) │ │ │└──────────┘ └──────────┘ │ ┌────────┐ ┌────────────────────┐ │ │ │Download│───▶│ Sharp Process │ │ │ │ Source │ │ ┌──────┐ ┌──────┐ │ │ │ └────────┘ │ │thumb │ │ card │ │ │ │ │ │150px │ │400px │ │ │ │ │ └──┬───┘ └──┬───┘ │ │ │ │ │ │ │ │ │ │ ┌──┴───┐ ┌──┴───┐ │ │ │ │ │ full │ │ og │ │ │ │ │ │1200px│ │1200px│ │ │ │ │ └──────┘ └──────┘ │ │ │ └─────────┬──────────┘ │ │ ▼ │ │ ┌────────────┐ │ │ │ Upload CDN │ │ │ └────────────┘ │ └─────────────────────────────────────────┘Multi-stage image processing with variants and CDN upload.
interface ImageJob { sourceUrl: string; variants: Array<{ name: string; width: number; height: number; format: 'webp' | 'avif' | 'jpeg'; quality: number; }>; destination: string; metadata?: Record<string, string>;}
const imageQueue = new Queue<ImageJob>('images', { embedded: true, defaultJobOptions: { attempts: 3, timeout: 120000, // 2 minutes },});
const imageWorker = new Worker<ImageJob>('images', async (job) => { const { sourceUrl, variants, destination, metadata } = job.data; const results: Record<string, string> = {};
await job.updateProgress(5, 'Downloading source image...'); const source = await downloadImage(sourceUrl);
const totalVariants = variants.length; for (let i = 0; i < totalVariants; i++) { const variant = variants[i]; const progress = 10 + Math.floor((i / totalVariants) * 80);
await job.updateProgress(progress, `Processing ${variant.name}...`);
// Resize and convert const processed = await sharp(source) .resize(variant.width, variant.height, { fit: 'cover' }) .toFormat(variant.format, { quality: variant.quality }) .toBuffer();
// Upload to CDN const path = `${destination}/${variant.name}.${variant.format}`; const url = await uploadToCDN(processed, path, metadata); results[variant.name] = url;
await job.log(`Uploaded ${variant.name}: ${url}`); }
await job.updateProgress(100, 'Complete');
return { variants: results, processedAt: new Date().toISOString(), };}, { embedded: true, concurrency: 5 });
// Process product imageawait imageQueue.add('product-image', { sourceUrl: 'https://uploads.example.com/raw/product-123.jpg', variants: [ { name: 'thumbnail', width: 150, height: 150, format: 'webp', quality: 80 }, { name: 'card', width: 400, height: 300, format: 'webp', quality: 85 }, { name: 'full', width: 1200, height: 900, format: 'webp', quality: 90 }, { name: 'og', width: 1200, height: 630, format: 'jpeg', quality: 85 }, ], destination: 'products/123', metadata: { 'Cache-Control': 'public, max-age=31536000' },});Data Export/Import System
Large dataset export with streaming and chunked processing.
interface ExportJob { type: 'users' | 'orders' | 'products' | 'transactions'; format: 'csv' | 'json' | 'parquet'; filters?: Record<string, unknown>; notifyEmail: string; chunkSize?: number;}
const exportQueue = new Queue<ExportJob>('exports', { embedded: true, defaultJobOptions: { attempts: 2, timeout: 1800000, // 30 minutes },});
const exportWorker = new Worker<ExportJob>('exports', async (job) => { const { type, format, filters, notifyEmail, chunkSize = 10000 } = job.data;
await job.updateProgress(5, 'Counting records...'); const totalCount = await countRecords(type, filters); const totalChunks = Math.ceil(totalCount / chunkSize);
await job.log(`Exporting ${totalCount} ${type} in ${totalChunks} chunks`);
// Create temporary file const tempFile = await createTempFile(`export-${job.id}.${format}`); const writer = await createWriter(tempFile, format);
// Process in chunks for (let chunk = 0; chunk < totalChunks; chunk++) { const progress = 10 + Math.floor((chunk / totalChunks) * 80); await job.updateProgress(progress, `Processing chunk ${chunk + 1}/${totalChunks}...`);
const records = await fetchRecords(type, filters, { offset: chunk * chunkSize, limit: chunkSize, });
await writer.write(records); }
await writer.close();
// Upload to storage await job.updateProgress(95, 'Uploading file...'); const url = await uploadToS3(tempFile, `exports/${job.id}.${format}`);
// Send notification await sendEmail({ to: notifyEmail, template: 'export-ready', data: { downloadUrl: url, type, recordCount: totalCount }, });
await job.updateProgress(100, 'Complete');
return { url, recordCount: totalCount, format, size: await getFileSize(tempFile), };}, { embedded: true, concurrency: 2 });
// Export all orders for accountingawait exportQueue.add('accounting-export', { type: 'orders', format: 'csv', filters: { status: 'completed', dateRange: { start: '2024-01-01', end: '2024-12-31' }, }, notifyEmail: 'accounting@company.com', chunkSize: 5000,});Multi-Channel Notification System
┌──────────┐ ┌──────────┐ ┌─────────────────────────────────────────┐│ Event │───▶│ Queue │───▶│ Worker ││(security,│ │(notifica-│ │ ┌────────────────────────────────────┐││ order) │ │ tions) │ │ │ Load User Preferences ││└──────────┘ └──────────┘ │ └─────────────────┬──────────────────┘│ │ ▼ │ │ ┌─────────────────────────────────────┐│ │ │ Fan Out by Channel ││ │ └────┬────────┬────────┬────────┬────┘│ │ ▼ ▼ ▼ ▼ │ │ ┌───────┐┌───────┐┌───────┐┌───────┐ │ │ │ Email ││ Push ││ SMS ││In-App │ │ │ │ ││ (FCM) ││(Twilio││ │ │ │ └───────┘└───────┘└───────┘└───────┘ │ └─────────────────────────────────────────┘Send notifications across multiple channels with preferences.
interface NotificationJob { userId: string; type: 'order_update' | 'promotion' | 'security_alert' | 'reminder'; title: string; body: string; data?: Record<string, unknown>; channels?: ('email' | 'push' | 'sms' | 'in_app')[];}
const notificationQueue = new Queue<NotificationJob>('notifications', { embedded: true, defaultJobOptions: { attempts: 3, backoff: 1000, },});
const notificationWorker = new Worker<NotificationJob>('notifications', async (job) => { const { userId, type, title, body, data, channels } = job.data;
// Get user preferences const user = await getUser(userId); const prefs = await getNotificationPreferences(userId);
// Determine channels to use const activeChannels = channels || getDefaultChannels(type); const allowedChannels = activeChannels.filter(ch => prefs[ch] !== false);
const results: Record<string, boolean> = {};
for (const channel of allowedChannels) { try { switch (channel) { case 'email': await sendEmail({ to: user.email, subject: title, body }); break; case 'push': await sendPushNotification(user.deviceTokens, { title, body, data }); break; case 'sms': await sendSMS(user.phone, `${title}: ${body}`); break; case 'in_app': await createInAppNotification(userId, { type, title, body, data }); break; } results[channel] = true; await job.log(`Sent via ${channel}`); } catch (err) { results[channel] = false; await job.log(`Failed ${channel}: ${err.message}`); } }
return { userId, type, channels: results, sentAt: new Date().toISOString(), };}, { embedded: true, concurrency: 15 });
// Security alert - high priority, all channelsawait notificationQueue.add('security-alert', { userId: 'user-123', type: 'security_alert', title: 'New login detected', body: 'A new device logged into your account from San Francisco, CA', data: { ip: '192.168.1.1', device: 'Chrome on MacOS' }, channels: ['email', 'push', 'sms'],}, { priority: 100 });Payment Processing Queue
┌──────────┐ ┌──────────┐ ┌─────────────────────────────────────────┐│ Order │───▶│ Queue │───▶│ Worker ││ Checkout │ │(payments)│ │ │└──────────┘ └──────────┘ │ ┌────────────────────────────────────┐ │ │ │ Check Idempotency Key │ │ │ └─────────────────┬──────────────────┘ │ │ ▼ │ │ ┌───────────────┐ │ │ │ Already Done? │ │ │ └───────┬───────┘ │ │ no │ yes │ │ ┌────────┴────────┐ │ │ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ Stripe │ │ Return │ │ │ │ Charge │ │ Cached │ │ │ └──────┬──────┘ └─────────────┘ │ │ ▼ │ │ ┌─────────────┐ │ │ │ Record Txn │ │ │ │ Audit Log │ │ │ └─────────────┘ │ └─────────────────────────────────────────┘
⚠️ DLQ: Manual review required (no auto-retry)Secure payment processing with idempotency and audit logging.
interface PaymentJob { orderId: string; amount: number; currency: string; customerId: string; paymentMethodId: string; metadata?: Record<string, string>; idempotencyKey: string;}
const paymentQueue = new Queue<PaymentJob>('payments', { embedded: true, defaultJobOptions: { attempts: 3, backoff: 5000, timeout: 60000, },});
// Configure strict DLQ for failed paymentspaymentQueue.setDlqConfig({ autoRetry: false, // Manual review required maxAge: 2592000000, // Keep for 30 days});
const paymentWorker = new Worker<PaymentJob>('payments', async (job) => { const { orderId, amount, currency, customerId, paymentMethodId, idempotencyKey } = job.data;
await job.log(`Processing payment for order ${orderId}`);
// Check idempotency const existing = await checkIdempotency(idempotencyKey); if (existing) { await job.log('Payment already processed (idempotent)'); return existing; }
// Create payment intent await job.updateProgress(20, 'Creating payment intent...'); const intent = await stripe.paymentIntents.create({ amount: Math.round(amount * 100), currency, customer: customerId, payment_method: paymentMethodId, confirm: true, idempotency_key: idempotencyKey, });
if (intent.status !== 'succeeded') { throw new Error(`Payment failed: ${intent.status}`); }
// Record transaction await job.updateProgress(80, 'Recording transaction...'); await recordTransaction({ orderId, paymentIntentId: intent.id, amount, currency, status: 'completed', });
// Audit log await createAuditLog({ action: 'payment.completed', orderId, amount, paymentIntentId: intent.id, jobId: job.id, });
await job.updateProgress(100, 'Complete');
return { paymentIntentId: intent.id, status: 'completed', processedAt: new Date().toISOString(), };}, { embedded: true, concurrency: 5 });
// Process order payment - CRITICAL: use durable for guaranteed persistenceawait paymentQueue.add('charge', { orderId: 'ORD-123', amount: 99.99, currency: 'usd', customerId: 'cus_xxx', paymentMethodId: 'pm_xxx', idempotencyKey: `order-${orderId}-payment`,}, { durable: true }); // Immediate disk write - no data loss on crashSearch Index Synchronization
Keep search indices in sync with database changes.
interface IndexJob { action: 'index' | 'update' | 'delete'; entity: 'product' | 'user' | 'article' | 'order'; id: string; data?: Record<string, unknown>;}
const indexQueue = new Queue<IndexJob>('search-index', { embedded: true, defaultJobOptions: { attempts: 5, backoff: 2000, removeOnComplete: true, },});
const indexWorker = new Worker<IndexJob>('search-index', async (job) => { const { action, entity, id, data } = job.data; const indexName = `${entity}s`; // products, users, etc.
switch (action) { case 'index': case 'update': const document = data || await fetchEntity(entity, id); const transformed = transformForSearch(entity, document); await elasticsearch.index({ index: indexName, id, body: transformed, }); await job.log(`Indexed ${entity}:${id}`); break;
case 'delete': await elasticsearch.delete({ index: indexName, id, ignore: [404], }); await job.log(`Deleted ${entity}:${id} from index`); break; }
return { action, entity, id };}, { embedded: true, concurrency: 20 });
// Index new productawait indexQueue.add('index-product', { action: 'index', entity: 'product', id: 'prod-123', data: { name: 'Premium Widget', description: 'High-quality widget for professionals', price: 49.99, categories: ['tools', 'professional'], tags: ['premium', 'bestseller'], },});
// Bulk re-index after schema changeconst productIds = await getAllProductIds();await indexQueue.addBulk( productIds.map(id => ({ name: 'reindex', data: { action: 'index', entity: 'product', id }, })));Scheduled Tasks with Cron
Recurring tasks using cron expressions (server mode) or intervals (embedded mode).
Server Mode (with Cron)
// Server mode: use upsertJobSchedulerawait scheduledQueue.upsertJobScheduler('daily-cleanup', { pattern: '0 3 * * *', // Every day at 3:00 AM data: { task: 'cleanup', params: { olderThanDays: 30 } },});
await scheduledQueue.upsertJobScheduler('health-check', { every: 300000, // Every 5 minutes data: { task: 'health-check' },});Embedded Mode (with setInterval)
import { Queue, Worker } from 'bunqueue/client';
interface ScheduledJob { task: string; params?: Record<string, unknown>;}
const scheduledQueue = new Queue<ScheduledJob>('scheduled', { embedded: true });
// Recurring job using setIntervalconst healthCheckInterval = setInterval(async () => { await scheduledQueue.add('health-check', { task: 'health-check' });}, 300000); // Every 5 minutes
// Daily cleanup (use node-cron or similar for cron expressions)import cron from 'node-cron';cron.schedule('0 3 * * *', async () => { await scheduledQueue.add('cleanup', { task: 'cleanup', params: { olderThanDays: 30 }, });});
const scheduledWorker = new Worker<ScheduledJob>('scheduled', async (job) => { const { task, params } = job.data;
switch (task) { case 'cleanup': const deleted = await cleanupOldRecords(params?.olderThanDays); return { deleted };
case 'health-check': const status = await checkSystemHealth(); if (!status.healthy) { await alertOps(status); } return status; }}, { embedded: true });
// Clean up on shutdownprocess.on('SIGINT', () => { clearInterval(healthCheckInterval); scheduledWorker.close();});Multi-Tenant Queue Isolation
┌─────────────────────────────────────────────────────────────────────────┐│ MULTI-TENANT ARCHITECTURE │├─────────────────────────────────────────────────────────────────────────┤│ ││ ┌────────────────────────────────┐ ┌────────────────────────────────┐ ││ │ TENANT A │ │ TENANT B │ ││ │ QueueGroup('tenant-a') │ │ QueueGroup('tenant-b') │ ││ │ ┌────────┐ ┌────────┐ │ │ ┌────────┐ ┌────────┐ │ ││ │ │emails │ │reports │ ... │ │ │emails │ │reports │ ... │ ││ │ └────────┘ └────────┘ │ │ └────────┘ └────────┘ │ ││ │ │ │ │ ││ │ • Isolated namespace │ │ • Isolated namespace │ ││ │ • Independent pause/resume │ │ • Independent pause/resume │ ││ │ • Separate DLQ │ │ • Separate DLQ │ ││ │ • Own rate limits │ │ • Own rate limits │ ││ └────────────────────────────────┘ └────────────────────────────────┘ ││ ││ ┌──────────────────────────────────────────────────────────────────┐ ││ │ SHARED INFRASTRUCTURE │ ││ │ SQLite (single DB, prefixed keys) │ ││ └──────────────────────────────────────────────────────────────────┘ ││ │└─────────────────────────────────────────────────────────────────────────┘Isolated queues per tenant with resource limits.
import { QueueGroup } from 'bunqueue/client';
interface TenantJob { action: string; data: Record<string, unknown>;}
// Create isolated queue groups per tenantfunction createTenantQueues(tenantId: string) { const group = new QueueGroup(tenantId);
return { emails: group.getQueue<TenantJob>('emails', { embedded: true, defaultJobOptions: { attempts: 3 }, }), reports: group.getQueue<TenantJob>('reports', { embedded: true, defaultJobOptions: { timeout: 300000 }, }), webhooks: group.getQueue<TenantJob>('webhooks', { embedded: true, defaultJobOptions: { attempts: 5 }, }), group, };}
// Tenant Aconst tenantA = createTenantQueues('tenant-a');await tenantA.emails.add('send', { action: 'welcome', data: { userId: '123' } });
// Tenant B (completely isolated)const tenantB = createTenantQueues('tenant-b');await tenantB.emails.add('send', { action: 'welcome', data: { userId: '456' } });
// Pause all queues for a tenant (maintenance)tenantA.group.pauseAll();
// ResumetenantA.group.resumeAll();
// List tenant's queuesconsole.log(tenantA.group.listQueues()); // ['emails', 'reports', 'webhooks']Rate-Limited API Calls
┌──────────────────────────────────────────────────────────────────────────┐│ THROUGHPUT CONTROL │├──────────────────────────────────────────────────────────────────────────┤│ ││ SERVER MODE: setRateLimit(100, 60000) → Token bucket algorithm ││ EMBEDDED MODE: concurrency + backoff → Parallel limit + retry delay ││ │└──────────────────────────────────────────────────────────────────────────┘External API integration with controlled throughput.
interface ApiJob { endpoint: string; method: 'GET' | 'POST' | 'PUT' | 'DELETE'; body?: Record<string, unknown>; headers?: Record<string, string>;}
const apiQueue = new Queue<ApiJob>('external-api', { embedded: true, defaultJobOptions: { attempts: 5, backoff: 10000, // 10 second base backoff for retries },});
// Control throughput via worker concurrencyconst apiWorker = new Worker<ApiJob>('external-api', async (job) => { const { endpoint, method, body, headers } = job.data;
const response = await fetch(endpoint, { method, headers: { 'Content-Type': 'application/json', ...headers, }, body: body ? JSON.stringify(body) : undefined, });
// Handle rate limiting from external API if (response.status === 429) { const retryAfter = parseInt(response.headers.get('Retry-After') || '60'); throw new Error(`Rate limited. Retry after ${retryAfter}s`); }
if (!response.ok) { throw new Error(`API error: ${response.status}`); }
return await response.json();}, { embedded: true, concurrency: 10, // Max 10 parallel requests (controls throughput)});
// Queue many API callsfor (const item of items) { await apiQueue.add('sync', { endpoint: 'https://api.external.com/items', method: 'POST', body: item, headers: { 'Authorization': `Bearer ${token}` }, });}Parent-Child Job Workflows
┌─────────────────────────────────────┐ │ ORDER FULFILLMENT │ │ (Parent Job) │ └─────────────────┬───────────────────┘ │ ┌────────────────────────────┼────────────────────────────┐ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ Validate │ │ Validate │ │ Calculate │ │ Inventory │ │ Payment │ │ Shipping │ │ (Child 1) │ │ (Child 2) │ │ (Child 3) │ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │ │ │ └──────────────────────────┼──────────────────────────┘ │ ▼ ┌─────────────────────┐ │ Parent Executes │ │ (all children done)│ └─────────────────────┘
SEQUENTIAL CHAIN: Extract → Transform → Load ┌───────────┐ ┌───────────┐ ┌───────────┐ │ Extract │────────▶│ Transform │────────▶│ Load │ │ (CSV) │ result │ (clean) │ result │(warehouse)│ └───────────┘ └───────────┘ └───────────┘
PARALLEL THEN MERGE ┌───────────┐ │ Chunk 1 │──────┐ └───────────┘ │ ┌───────────┐ │ ┌───────────┐ │ Chunk 2 │──────┼─────▶│ Merge │ └───────────┘ │ │ Results │ ┌───────────┐ │ └───────────┘ │ Chunk 3 │──────┘ └───────────┘Complex workflows with dependencies using FlowProducer.
import { FlowProducer, Queue, Worker } from 'bunqueue/client';
const flow = new FlowProducer();
// Order fulfillment workflowconst orderFlow = await flow.add({ name: 'fulfill-order', queueName: 'orders', data: { orderId: 'ORD-123' }, children: [ // These run in parallel first { name: 'validate-inventory', queueName: 'inventory', data: { orderId: 'ORD-123' }, }, { name: 'validate-payment', queueName: 'payments', data: { orderId: 'ORD-123' }, }, { name: 'calculate-shipping', queueName: 'shipping', data: { orderId: 'ORD-123' }, }, ],});
// Sequential chain: A → B → Cconst pipeline = await flow.addChain([ { name: 'extract', queueName: 'etl', data: { source: 's3://bucket/data.csv' } }, { name: 'transform', queueName: 'etl', data: {} }, // Gets result from extract { name: 'load', queueName: 'etl', data: { target: 'warehouse' } },]);
// Parallel batch then mergeconst batchFlow = await flow.addBulkThen( // These run in parallel [ { name: 'process-chunk-1', queueName: 'processing', data: { chunk: 1 } }, { name: 'process-chunk-2', queueName: 'processing', data: { chunk: 2 } }, { name: 'process-chunk-3', queueName: 'processing', data: { chunk: 3 } }, ], // This runs after all chunks complete { name: 'merge-results', queueName: 'processing', data: {} });
// Workers can access parent resultsconst mergeWorker = new Worker('processing', async (job) => { if (job.name === 'merge-results') { const childResults = await flow.getParentResults(job); // childResults = [{ chunk: 1, result: ... }, { chunk: 2, result: ... }, ...] return combineResults(childResults); } // Process individual chunk return processChunk(job.data.chunk);}, { embedded: true });Video Transcoding Pipeline
Multi-resolution video transcoding with progress tracking.
interface TranscodeJob { sourceUrl: string; outputPath: string; resolutions: Array<{ name: string; width: number; height: number; bitrate: string; }>; format: 'mp4' | 'webm' | 'hls'; webhookUrl?: string;}
const transcodeQueue = new Queue<TranscodeJob>('transcode', { embedded: true, defaultJobOptions: { attempts: 2, timeout: 3600000, // 1 hour },});
const transcodeWorker = new Worker<TranscodeJob>('transcode', async (job) => { const { sourceUrl, outputPath, resolutions, format, webhookUrl } = job.data; const results: Record<string, string> = {};
await job.updateProgress(5, 'Downloading source video...'); const sourcePath = await downloadVideo(sourceUrl);
const totalResolutions = resolutions.length; for (let i = 0; i < totalResolutions; i++) { const res = resolutions[i]; const progress = 10 + Math.floor((i / totalResolutions) * 85);
await job.updateProgress(progress, `Transcoding ${res.name}...`); await job.log(`Starting ${res.name}: ${res.width}x${res.height} @ ${res.bitrate}`);
const output = await ffmpeg(sourcePath) .size(`${res.width}x${res.height}`) .videoBitrate(res.bitrate) .format(format) .output(`${outputPath}/${res.name}.${format}`) .run();
const url = await uploadToCDN(output, `${outputPath}/${res.name}.${format}`); results[res.name] = url;
await job.log(`Completed ${res.name}: ${url}`); }
// Generate thumbnail await job.updateProgress(98, 'Generating thumbnail...'); const thumbnail = await generateThumbnail(sourcePath); results.thumbnail = await uploadToCDN(thumbnail, `${outputPath}/thumb.jpg`);
// Webhook notification if (webhookUrl) { await fetch(webhookUrl, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ jobId: job.id, status: 'completed', results }), }); }
await job.updateProgress(100, 'Complete');
return { outputs: results, completedAt: new Date().toISOString(), };}, { embedded: true, concurrency: 2 });
// Transcode uploaded videoawait transcodeQueue.add('transcode-video', { sourceUrl: 'https://uploads.example.com/raw/video-123.mov', outputPath: 'videos/123', resolutions: [ { name: '1080p', width: 1920, height: 1080, bitrate: '5000k' }, { name: '720p', width: 1280, height: 720, bitrate: '2500k' }, { name: '480p', width: 854, height: 480, bitrate: '1000k' }, { name: '360p', width: 640, height: 360, bitrate: '500k' }, ], format: 'mp4', webhookUrl: 'https://api.example.com/webhooks/transcode',});Graceful Shutdown Pattern
Production-ready shutdown handling for all workers.
import { Queue, Worker, shutdownManager } from 'bunqueue/client';
// Initialize all queues and workersconst queues = { emails: new Queue('emails', { embedded: true }), reports: new Queue('reports', { embedded: true }), webhooks: new Queue('webhooks', { embedded: true }),};
const workers = { emails: new Worker('emails', emailProcessor, { embedded: true, concurrency: 10 }), reports: new Worker('reports', reportProcessor, { embedded: true, concurrency: 3 }), webhooks: new Worker('webhooks', webhookProcessor, { embedded: true, concurrency: 20 }),};
// Graceful shutdown handlerasync function gracefulShutdown(signal: string) { console.log(`\nReceived ${signal}. Starting graceful shutdown...`);
// 1. Stop accepting new jobs console.log('Pausing all workers...'); for (const worker of Object.values(workers)) { worker.pause(); }
// 2. Wait for active jobs to complete (with timeout) console.log('Waiting for active jobs to complete...'); const shutdownTimeout = 30000; // 30 seconds
try { await Promise.race([ Promise.all(Object.values(workers).map(w => w.close())), new Promise((_, reject) => setTimeout(() => reject(new Error('Shutdown timeout')), shutdownTimeout) ), ]); console.log('All workers closed gracefully'); } catch (err) { console.log('Forcing worker shutdown...'); await Promise.all(Object.values(workers).map(w => w.close(true))); }
// 3. Close queue connections console.log('Closing queue connections...'); await Promise.all(Object.values(queues).map(q => q.close()));
// 4. Shutdown the embedded manager (flushes SQLite) shutdownManager();
console.log('Shutdown complete'); process.exit(0);}
// Register signal handlersprocess.on('SIGINT', () => gracefulShutdown('SIGINT'));process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
// Handle uncaught errorsprocess.on('uncaughtException', async (err) => { console.error('Uncaught exception:', err); await gracefulShutdown('uncaughtException');});
process.on('unhandledRejection', async (reason) => { console.error('Unhandled rejection:', reason); await gracefulShutdown('unhandledRejection');});