Worker API — Process Background Jobs with Concurrency & Retries
The Worker class processes jobs from a queue.
Creating a Worker
Section titled “Creating a Worker”import { Worker } from 'bunqueue/client';
const worker = new Worker('my-queue', async (job) => { // Process the job return { success: true };}, { embedded: true });Options
Section titled “Options”const worker = new Worker('queue', processor, { // Mode embedded: true, // Required for embedded mode
// Concurrency concurrency: 5, // Process 5 jobs in parallel (default: 1)
// Startup autorun: true, // Start automatically (default: true)
// Heartbeats & Stall Detection heartbeatInterval: 10000, // Send heartbeat every 10s (default: 10000, 0 = disabled)
// Batch Pulling (performance optimization) batchSize: 10, // Jobs to pull per request (default: 10, max: 1000) pollTimeout: 5000, // Long-poll timeout in ms (default: 0, max: 30000)
// Lock-Based Ownership (BullMQ-style) useLocks: true, // Enable job locks (default: true)
// Rate Limiting limiter: { max: 10, // Max 10 jobs per duration window duration: 1000, // Window of 1 second groupKey: 'userId', // Optional: rate limit per group key in job data },
// Lock & Stall Tuning lockDuration: 30000, // Job lock TTL in ms (default: 30000) maxStalledCount: 1, // Max stalls before job moves to failed (default: 1) skipStalledCheck: false, // Skip stall detection (default: false) skipLockRenewal: false, // Skip heartbeat lock renewal (default: false) drainDelay: 5000, // Pause when queue is empty (default: 5000)
// Auto-Cleanup removeOnComplete: true, // Remove completed jobs immediately // removeOnComplete: 100, // Keep last 100 completed jobs // removeOnComplete: { age: 3600000, count: 500 }, // Keep by age or count removeOnFail: false, // Same format as removeOnComplete
// TCP Connection (server mode only) connection: { host: 'localhost', port: 6789, token: 'secret', poolSize: 4, },});Connection pool sizing: When poolSize is not specified, it defaults to min(concurrency, 8). A worker with concurrency: 5 opens 5 TCP connections, while concurrency: 20 caps at 8. You can override this by setting poolSize explicitly.
Options Reference
Section titled “Options Reference”| Option | Type | Default | Description |
|---|---|---|---|
embedded | boolean | false | Use in-process mode |
concurrency | number | 1 | Parallel job processing |
autorun | boolean | true | Start automatically |
heartbeatInterval | number | 10000 | Heartbeat interval in ms (0 = disabled) |
batchSize | number | 10 | Jobs to pull per batch (max: 1000) |
pollTimeout | number | 0 | Long-poll timeout in ms (max: 30000) |
useLocks | boolean | true | Enable BullMQ-style job locks |
limiter | RateLimiterOptions | — | Rate limiter for job processing ({ max, duration, groupKey? }) |
lockDuration | number | 30000 | How long a job lock lasts before expiring (ms) |
maxStalledCount | number | 1 | Max times a job can stall before moving to failed |
skipStalledCheck | boolean | false | Skip stalled job detection |
skipLockRenewal | boolean | false | Skip lock renewal via heartbeat |
drainDelay | number | 5000 | Pause duration when queue is drained (ms) |
removeOnComplete | boolean | number | KeepJobs | false | Auto-remove completed jobs. true = immediate, number = max count, { age?, count? } = by age/count |
removeOnFail | boolean | number | KeepJobs | false | Auto-remove failed jobs. Same format as removeOnComplete |
Job Object
Section titled “Job Object”Inside the processor, you have access to the job object:
const worker = new Worker('queue', async (job) => { job.id; // Job ID job.name; // Job name job.data; // Job data job.queueName; // Queue name job.attemptsMade; // Current attempt number job.timestamp; // When job was created job.progress; // Current progress (0-100)
// Update progress await job.updateProgress(50, 'Halfway done');
// Log messages await job.log('Processing step 1');
return result;}, { embedded: true });Events
Section titled “Events”All events are fully typed — TypeScript will autocomplete event names and infer callback parameter types.
worker.on('ready', () => { console.log('Worker is ready');});
worker.on('active', (job) => { console.log(`Started: ${job.id}`);});
worker.on('completed', (job, result) => { console.log(`Completed: ${job.id}`, result);});
worker.on('failed', (job, error) => { console.error(`Failed: ${job.id}`, error.message);});
worker.on('progress', (job, progress) => { console.log(`Progress: ${job.id} - ${progress}%`);});
worker.on('stalled', (jobId, reason) => { console.warn(`Stalled: ${jobId} (${reason})`);});
worker.on('drained', () => { console.log('No more jobs in queue');});
worker.on('error', (error) => { console.error('Worker error:', error);});
worker.on('cancelled', ({ jobId, reason }) => { console.log(`Cancelled: ${jobId} - ${reason}`);});
worker.on('closed', () => { console.log('Worker closed');});Event Reference
Section titled “Event Reference”| Event | Callback Parameters | Description |
|---|---|---|
ready | () | Worker started polling |
active | (job: Job<T>) | Job started processing |
completed | (job: Job<T>, result: R) | Job completed successfully |
failed | (job: Job<T>, error: Error) | Job processing failed |
progress | (job: Job<T> | null, progress: number) | Job progress updated |
stalled | (jobId: string, reason: string) | Job stalled (no heartbeat) |
drained | () | Queue has no more waiting jobs |
error | (error: Error) | Worker-level error |
cancelled | ({ jobId: string, reason: string }) | Job was cancelled |
closed | () | Worker shut down |
Control
Section titled “Control”// Start processing (if autorun: false)worker.run();
// Pause processingworker.pause();
// Resume processingworker.resume();
// Stop the workerawait worker.close(); // Wait for active jobsawait worker.close(true); // Force close immediatelyHeartbeats
Section titled “Heartbeats”Workers automatically send heartbeats while processing jobs. This enables stall detection - if a job doesn’t receive a heartbeat within the configured interval, it’s considered stalled.
const worker = new Worker('queue', processor, { embedded: true, heartbeatInterval: 5000, // Send heartbeat every 5 seconds});Tip: The heartbeatInterval should be less than the queue’s stallInterval to avoid false positives.
See Stall Detection for more details.
Lock-Based Ownership
Section titled “Lock-Based Ownership”When useLocks: true (default), workers use BullMQ-style lock tokens:
- Each job gets a unique lock token when pulled
- Workers must provide the token when acknowledging/failing jobs
- Prevents job theft between workers
- Lock is renewed via heartbeats
- Heartbeats support a custom
durationparameter to extend the lock for a specific TTL instead of using the default
const worker = new Worker('queue', processor, { embedded: true, useLocks: false, // Rely on stall detection only (embedded mode)});Batch Pulling
Section titled “Batch Pulling”For better performance with many jobs, use batch pulling:
const worker = new Worker('queue', processor, { embedded: true, batchSize: 100, // Pull 100 jobs at once pollTimeout: 5000, // Wait up to 5s for jobs (long polling)});Error Handling
Section titled “Error Handling”const worker = new Worker('queue', async (job) => { try { await riskyOperation(); } catch (error) { // Job will be retried if attempts remain throw error; }}, { embedded: true });
// Handle at worker levelworker.on('failed', (job, error) => { if (job.attemptsMade >= 3) { // Final failure - alert someone alertOps(job, error); }});SandboxedWorker
Section titled “SandboxedWorker”SandboxedWorker runs processors in isolated Bun Worker threads for crash isolation and memory protection. It is designed for CPU-intensive tasks or untrusted code that could crash the process.
import { SandboxedWorker } from 'bunqueue/client';
// Embedded mode (in-process)const worker = new SandboxedWorker('cpu-intensive', { processor: './processor.ts', // Path to processor file concurrency: 4, // 4 parallel worker processes timeout: 60000, // 60s timeout per job (default: 30000) maxMemory: 256, // MB per worker (default: 256) maxRestarts: 10, // Auto-restart limit (default: 10) autoRestart: true, // Auto-restart crashed workers (default: true) pollInterval: 10, // Job poll interval in ms (default: 10)});
await worker.start();Options Reference
Section titled “Options Reference”| Option | Type | Default | Description |
|---|---|---|---|
processor | string | — | Path to processor file (required) |
concurrency | number | 1 | Number of parallel worker threads |
maxMemory | number | 256 | Max memory per worker thread in MB |
timeout | number | 30000 | Job processing timeout in ms |
autoRestart | boolean | true | Auto-restart crashed workers |
maxRestarts | number | 10 | Max restart attempts per worker |
pollInterval | number | 10 | Job poll interval in ms |
heartbeatInterval | number | 5000 (embedded) / 10000 (TCP) | Heartbeat interval for stall detection / lock renewal |
autoStart | boolean | false | Auto-restart worker pool when new jobs arrive after idle shutdown |
autoStartPollMs | number | 5000 | Poll interval for checking new jobs while idle-stopped |
connection | ConnectionOptions | — | TCP connection config (omit for embedded) |
TCP Mode
Section titled “TCP Mode”SandboxedWorker also supports TCP mode for connecting to a remote bunqueue server. Pass a connection option to enable it:
import { SandboxedWorker } from 'bunqueue/client';
// TCP mode - connects to bunqueue serverconst worker = new SandboxedWorker('cpu-intensive', { processor: './processor.ts', concurrency: 4, connection: { host: 'localhost', port: 6789, token: 'my-auth-token', // Optional auth }, heartbeatInterval: 10000, // Lock renewal interval (default: 10000 for TCP)});
await worker.start();Processor file (processor.ts):
export default async (job: { id: string; data: any; queue: string; attempts: number; parentId?: string; progress: (value: number) => void; log: (message: string) => void; fail: (error: string | Error) => void;}) => { job.log('Starting heavy computation'); job.progress(50); const result = await heavyComputation(job.data); job.progress(100); job.log('Computation finished'); return result;};Worker vs SandboxedWorker
Section titled “Worker vs SandboxedWorker”| Worker | SandboxedWorker | |
|---|---|---|
| Production ready | ✅ Stable, no experimental APIs | ⚠️ Depends on experimental Bun Workers |
| I/O-bound tasks (HTTP, DB, APIs) | ✅ Best choice | Overkill |
| CPU-intensive tasks | ⚠️ Blocks event loop | ✅ Runs in separate thread |
| Untrusted code | ❌ Runs in main thread | ✅ Isolated |
| Memory leak protection | ❌ | ✅ Per-worker memory limit |
| Crash isolation | ❌ | ✅ Only the worker thread dies |
| Events | 10 events | 8 events (no stalled, drained, cancelled) |
| Concurrency, retries, heartbeats | ✅ | ✅ Same behavior |
SandboxedWorker Events
Section titled “SandboxedWorker Events”SandboxedWorker supports 8 events. Note that stalled, drained, and cancelled are not available — these are only on the regular Worker.
worker.on('ready', () => { console.log('Worker pool is ready');});
worker.on('active', (job) => { console.log(`Job ${job.id} dispatched to worker process`);});
worker.on('completed', (job, result) => { console.log(`Job ${job.id} completed:`, result);});
worker.on('failed', (job, error) => { console.error(`Job ${job.id} failed:`, error.message);});
worker.on('progress', (job, progress) => { console.log(`Job ${job.id} progress: ${progress}%`);});
worker.on('log', (job, message) => { console.log(`Job ${job.id} log: ${message}`);});
worker.on('error', (error) => { console.error('Worker error:', error);});
worker.on('closed', () => { console.log('Worker pool stopped');});Event Reference
Section titled “Event Reference”| Event | Callback Parameters | Description |
|---|---|---|
ready | () | Worker pool started and all threads spawned |
active | (job: Job) | Job dispatched to a worker thread |
completed | (job: Job, result: unknown) | Job completed successfully |
failed | (job: Job, error: Error) | Job failed, timed out, or worker crashed |
progress | (job: Job, progress: number) | Job progress updated (0-100) |
log | (job: Job, message: string) | Log message from processor via job.log() |
error | (error: Error) | Worker-level error (dispatch failure, heartbeat error, crash) |
closed | () | Worker pool stopped |
SandboxedWorker API
Section titled “SandboxedWorker API”// Start the worker poolworker.start();
// Get statsconst stats = worker.getStats();// { total: 4, busy: 2, idle: 2, restarts: 0 }
// Graceful shutdownawait worker.stop();Lifecycle & Shutdown
Section titled “Lifecycle & Shutdown”When using embedded mode, call shutdownManager() to cleanly shut down the shared QueueManager (flushes write buffer, closes SQLite). In TCP mode, use closeSharedTcpClient() to close the shared TCP connection pool.
import { shutdownManager, closeSharedTcpClient } from 'bunqueue/client';
process.on('SIGINT', async () => { await worker.close();
// Embedded mode: shut down the shared QueueManager shutdownManager();
// TCP mode: close the shared TCP connection pool closeSharedTcpClient();
process.exit(0);});| Function | Mode | Description |
|---|---|---|
shutdownManager() | Embedded | Shuts down the shared QueueManager, flushes pending writes, closes SQLite |
closeSharedTcpClient() | TCP | Closes the shared TCP client connection |
closeAllSharedPools() | TCP | Closes all shared TCP connection pools |
CPU-Intensive Workers (TCP)
Section titled “CPU-Intensive Workers (TCP)”When processing CPU-heavy jobs over TCP, synchronous work can block the event loop and cause connection drops. See the dedicated CPU-Intensive Workers guide for connection tuning, yield patterns, and timeout reference.
Complete Example
Section titled “Complete Example”import { Queue, Worker, shutdownManager } from 'bunqueue/client';
interface EmailJob { to: string; subject: string; body: string;}
const queue = new Queue<EmailJob>('emails', { embedded: true });
const worker = new Worker<EmailJob>('emails', async (job) => { console.log(`Sending email to: ${job.data.to}`);
await job.updateProgress(50, 'Composing email...'); await job.log(`Subject: ${job.data.subject}`);
// Simulate sending await Bun.sleep(100);
await job.updateProgress(100, 'Sent!'); return { sent: true, timestamp: Date.now() };}, { embedded: true, concurrency: 5, heartbeatInterval: 5000,});
worker.on('completed', (job, result) => { console.log(`✓ Email sent: ${job.id}`);});
worker.on('failed', (job, error) => { console.error(`✗ Email failed: ${job.id} - ${error.message}`);});
// Graceful shutdownprocess.on('SIGINT', async () => { console.log('Shutting down...'); await worker.close(); shutdownManager(); process.exit(0);});