Worker API
The Worker class processes jobs from a queue.
Creating a Worker
import { Worker } from 'bunqueue/client';
const worker = new Worker('my-queue', async (job) => { // Process the job return { success: true };}, { embedded: true });Options
const worker = new Worker('queue', processor, { // Mode embedded: true, // Required for embedded mode
// Concurrency concurrency: 5, // Process 5 jobs in parallel (default: 1)
// Startup autorun: true, // Start automatically (default: true)
// Heartbeats & Stall Detection heartbeatInterval: 10000, // Send heartbeat every 10s (default: 10000, 0 = disabled)
// Batch Pulling (performance optimization) batchSize: 10, // Jobs to pull per request (default: 10, max: 1000) pollTimeout: 5000, // Long-poll timeout in ms (default: 0, max: 30000)
// Lock-Based Ownership (BullMQ-style) useLocks: true, // Enable job locks (default: true)
// TCP Connection (server mode only) connection: { host: 'localhost', port: 6789, token: 'secret', poolSize: 4, },});Options Reference
| Option | Type | Default | Description |
|---|---|---|---|
embedded | boolean | false | Use in-process mode |
concurrency | number | 1 | Parallel job processing |
autorun | boolean | true | Start automatically |
heartbeatInterval | number | 10000 | Heartbeat interval in ms (0 = disabled) |
batchSize | number | 10 | Jobs to pull per batch (max: 1000) |
pollTimeout | number | 0 | Long-poll timeout in ms (max: 30000) |
useLocks | boolean | true | Enable BullMQ-style job locks |
Job Object
Inside the processor, you have access to the job object:
const worker = new Worker('queue', async (job) => { job.id; // Job ID job.name; // Job name job.data; // Job data job.queueName; // Queue name job.attemptsMade; // Current attempt number job.timestamp; // When job was created job.progress; // Current progress (0-100)
// Update progress await job.updateProgress(50, 'Halfway done');
// Log messages await job.log('Processing step 1');
return result;}, { embedded: true });Events
worker.on('ready', () => { console.log('Worker is ready');});
worker.on('active', (job) => { console.log(`Started: ${job.id}`);});
worker.on('completed', (job, result) => { console.log(`Completed: ${job.id}`, result);});
worker.on('failed', (job, error) => { console.error(`Failed: ${job.id}`, error.message);});
worker.on('progress', (job, progress) => { console.log(`Progress: ${job.id} - ${progress}%`);});
worker.on('error', (error) => { console.error('Worker error:', error);});
worker.on('closed', () => { console.log('Worker closed');});Control
// Start processing (if autorun: false)worker.run();
// Pause processingworker.pause();
// Resume processingworker.resume();
// Stop the workerawait worker.close(); // Wait for active jobsawait worker.close(true); // Force close immediatelyHeartbeats
Workers automatically send heartbeats while processing jobs. This enables stall detection - if a job doesn’t receive a heartbeat within the configured interval, it’s considered stalled.
const worker = new Worker('queue', processor, { embedded: true, heartbeatInterval: 5000, // Send heartbeat every 5 seconds});Tip: The heartbeatInterval should be less than the queue’s stallInterval to avoid false positives.
See Stall Detection for more details.
Lock-Based Ownership
When useLocks: true (default), workers use BullMQ-style lock tokens:
- Each job gets a unique lock token when pulled
- Workers must provide the token when acknowledging/failing jobs
- Prevents job theft between workers
- Lock is renewed via heartbeats
const worker = new Worker('queue', processor, { embedded: true, useLocks: false, // Rely on stall detection only (embedded mode)});Batch Pulling
For better performance with many jobs, use batch pulling:
const worker = new Worker('queue', processor, { embedded: true, batchSize: 100, // Pull 100 jobs at once pollTimeout: 5000, // Wait up to 5s for jobs (long polling)});Error Handling
const worker = new Worker('queue', async (job) => { try { await riskyOperation(); } catch (error) { // Job will be retried if attempts remain throw error; }}, { embedded: true });
// Handle at worker levelworker.on('failed', (job, error) => { if (job.attemptsMade >= 3) { // Final failure - alert someone alertOps(job, error); }});SandboxedWorker
For CPU-intensive tasks or jobs that might crash, use SandboxedWorker to run processors in isolated Bun Worker processes.
import { SandboxedWorker } from 'bunqueue/client';
const worker = new SandboxedWorker('cpu-intensive', { embedded: true, // Required for embedded mode processor: './processor.ts', // Path to processor file concurrency: 4, // 4 parallel worker processes timeout: 60000, // 60s timeout per job (default: 30000) maxMemory: 256, // MB per worker (default: 256) maxRestarts: 10, // Auto-restart limit (default: 10) autoRestart: true, // Auto-restart crashed workers (default: true) pollInterval: 10, // Job poll interval in ms (default: 10)});
worker.start();Processor file (processor.ts):
export default async (job: { id: string; data: any; queue: string; attempts: number; progress: (value: number) => void;}) => { job.progress(50); const result = await heavyComputation(job.data); job.progress(100); return result;};When to Use SandboxedWorker
| Use Case | Worker | SandboxedWorker |
|---|---|---|
| Fast I/O tasks | ✅ | ❌ |
| CPU-intensive | ❌ | ✅ |
| Untrusted code | ❌ | ✅ |
| Memory leak protection | ❌ | ✅ |
| Crash isolation | ❌ | ✅ |
SandboxedWorker API
// Start the worker poolworker.start();
// Get statsconst stats = worker.getStats();// { total: 4, busy: 2, idle: 2, restarts: 0 }
// Graceful shutdownawait worker.stop();Complete Example
import { Queue, Worker, shutdownManager } from 'bunqueue/client';
interface EmailJob { to: string; subject: string; body: string;}
const queue = new Queue<EmailJob>('emails', { embedded: true });
const worker = new Worker<EmailJob>('emails', async (job) => { console.log(`Sending email to: ${job.data.to}`);
await job.updateProgress(50, 'Composing email...'); await job.log(`Subject: ${job.data.subject}`);
// Simulate sending await Bun.sleep(100);
await job.updateProgress(100, 'Sent!'); return { sent: true, timestamp: Date.now() };}, { embedded: true, concurrency: 5, heartbeatInterval: 5000,});
worker.on('completed', (job, result) => { console.log(`✓ Email sent: ${job.id}`);});
worker.on('failed', (job, error) => { console.error(`✗ Email failed: ${job.id} - ${error.message}`);});
// Graceful shutdownprocess.on('SIGINT', async () => { console.log('Shutting down...'); await worker.close(); shutdownManager(); process.exit(0);});