Queue Group
QueueGroup provides namespace isolation for related queues. All queues in a group share a common prefix.
Basic Usage
import { QueueGroup } from 'bunqueue/client';
// Create a group with namespaceconst billing = new QueueGroup('billing');
// Get queues (automatically prefixed, pass embedded: true for each)const invoices = billing.getQueue('invoices', { embedded: true }); // → "billing:invoices"const payments = billing.getQueue('payments', { embedded: true }); // → "billing:payments"
// Add jobsawait invoices.add('create', { amount: 100 });await payments.add('process', { orderId: '123' });Creating Workers
// Create worker for a queue in the groupconst invoiceWorker = billing.getWorker('invoices', async (job) => { console.log('Processing invoice:', job.data); return { processed: true };}, { embedded: true });
const paymentWorker = billing.getWorker('payments', async (job) => { console.log('Processing payment:', job.data); return { processed: true };}, { embedded: true });Listing Queues
// List all queues in the group (without prefix)const queues = billing.listQueues();// ['invoices', 'payments']Bulk Operations
Perform operations on all queues in the group at once:
// Pause all queues in the groupbilling.pauseAll();
// Resume all queues in the groupbilling.resumeAll();
// Drain all queues (remove waiting jobs)billing.drainAll();
// Obliterate all queues (remove all data)billing.obliterateAll();Options
Pass options when creating queues or workers:
const billing = new QueueGroup('billing');
// Queue with options (embedded: true required for in-process mode)const invoices = billing.getQueue<InvoiceData>('invoices', { embedded: true, defaultJobOptions: { attempts: 5, backoff: 2000, }});
// Worker with optionsconst worker = billing.getWorker('invoices', processor, { embedded: true, concurrency: 10,});Use Cases
Multi-Tenant Applications
// Create a group per tenantconst tenantA = new QueueGroup('tenant-a');const tenantB = new QueueGroup('tenant-b');
// Each tenant has isolated queues (pass embedded: true to each queue)const tasksA = tenantA.getQueue('tasks', { embedded: true });const tasksB = tenantB.getQueue('tasks', { embedded: true });
// Jobs are isolatedawait tasksA.add('process', { tenantId: 'a' });await tasksB.add('process', { tenantId: 'b' });Microservice Domains
// Group queues by domainconst orders = new QueueGroup('orders');const notifications = new QueueGroup('notifications');const analytics = new QueueGroup('analytics');
// Each domain has its own queues (pass embedded: true to each)const orderQueue = orders.getQueue('process', { embedded: true });const emailQueue = notifications.getQueue('email', { embedded: true });const eventQueue = analytics.getQueue('events', { embedded: true });Environment Separation
const env = process.env.NODE_ENV || 'development';const group = new QueueGroup(`${env}-tasks`);
const queue = group.getQueue('jobs', { embedded: true });// → "development-tasks:jobs" or "production-tasks:jobs"Methods Reference
| Method | Description |
|---|---|
getQueue(name, opts?) | Get a queue within the group |
getWorker(name, processor, opts?) | Create a worker for a queue in the group |
listQueues() | List all queue names in the group (without prefix) |
pauseAll() | Pause all queues in the group |
resumeAll() | Resume all queues in the group |
drainAll() | Remove waiting jobs from all queues |
obliterateAll() | Remove all data from all queues |
Complete Example
import { QueueGroup, shutdownManager } from 'bunqueue/client';
interface OrderData { orderId: string; amount: number;}
interface NotificationData { userId: string; message: string;}
// Create groupsconst orders = new QueueGroup('orders');const notifications = new QueueGroup('notifications');
// Create queuesconst orderQueue = orders.getQueue<OrderData>('process', { embedded: true });const emailQueue = notifications.getQueue<NotificationData>('email', { embedded: true });
// Create workersconst orderWorker = orders.getWorker<OrderData>('process', async (job) => { console.log(`Processing order: ${job.data.orderId}`);
// Create notification after order await emailQueue.add('order-confirmation', { userId: 'user-123', message: `Order ${job.data.orderId} confirmed!`, });
return { processed: true };}, { embedded: true, concurrency: 5 });
const emailWorker = notifications.getWorker<NotificationData>('email', async (job) => { console.log(`Sending email to: ${job.data.userId}`); return { sent: true };}, { embedded: true, concurrency: 3 });
// Add an orderawait orderQueue.add('new-order', { orderId: 'ORD-001', amount: 99.99 });
// Check queues in each groupconsole.log('Order queues:', orders.listQueues());console.log('Notification queues:', notifications.listQueues());
// Graceful shutdownprocess.on('SIGINT', async () => { await orderWorker.close(); await emailWorker.close(); shutdownManager(); process.exit(0);});