Skip to content

Worker API — Process Background Jobs with Concurrency & Retries

The Worker class processes jobs from a queue.

import { Worker } from 'bunqueue/client';
const worker = new Worker('my-queue', async (job) => {
// Process the job
return { success: true };
}, { embedded: true });
const worker = new Worker('queue', processor, {
// Mode
embedded: true, // Required for embedded mode
// Concurrency
concurrency: 5, // Process 5 jobs in parallel (default: 1)
// Startup
autorun: true, // Start automatically (default: true)
// Heartbeats & Stall Detection
heartbeatInterval: 10000, // Send heartbeat every 10s (default: 10000, 0 = disabled)
// Batch Pulling (performance optimization)
batchSize: 10, // Jobs to pull per request (default: 10, max: 1000)
pollTimeout: 5000, // Long-poll timeout in ms (default: 0, max: 30000)
// Lock-Based Ownership (BullMQ-style)
useLocks: true, // Enable job locks (default: true)
// Rate Limiting
limiter: {
max: 10, // Max 10 jobs per duration window
duration: 1000, // Window of 1 second
groupKey: 'userId', // Optional: rate limit per group key in job data
},
// Lock & Stall Tuning
lockDuration: 30000, // Job lock TTL in ms (default: 30000)
maxStalledCount: 1, // Max stalls before job moves to failed (default: 1)
skipStalledCheck: false, // Skip stall detection (default: false)
skipLockRenewal: false, // Skip heartbeat lock renewal (default: false)
drainDelay: 5000, // Pause when queue is empty (default: 5000)
// Auto-Cleanup
removeOnComplete: true, // Remove completed jobs immediately
// removeOnComplete: 100, // Keep last 100 completed jobs
// removeOnComplete: { age: 3600000, count: 500 }, // Keep by age or count
removeOnFail: false, // Same format as removeOnComplete
// TCP Connection (server mode only)
connection: {
host: 'localhost',
port: 6789,
token: 'secret',
poolSize: 4,
},
});

Connection pool sizing: When poolSize is not specified, it defaults to min(concurrency, 8). A worker with concurrency: 5 opens 5 TCP connections, while concurrency: 20 caps at 8. You can override this by setting poolSize explicitly.

OptionTypeDefaultDescription
embeddedbooleanfalseUse in-process mode
concurrencynumber1Parallel job processing
autorunbooleantrueStart automatically
heartbeatIntervalnumber10000Heartbeat interval in ms (0 = disabled)
batchSizenumber10Jobs to pull per batch (max: 1000)
pollTimeoutnumber0Long-poll timeout in ms (max: 30000)
useLocksbooleantrueEnable BullMQ-style job locks
limiterRateLimiterOptionsRate limiter for job processing ({ max, duration, groupKey? })
lockDurationnumber30000How long a job lock lasts before expiring (ms)
maxStalledCountnumber1Max times a job can stall before moving to failed
skipStalledCheckbooleanfalseSkip stalled job detection
skipLockRenewalbooleanfalseSkip lock renewal via heartbeat
drainDelaynumber5000Pause duration when queue is drained (ms)
removeOnCompleteboolean | number | KeepJobsfalseAuto-remove completed jobs. true = immediate, number = max count, { age?, count? } = by age/count
removeOnFailboolean | number | KeepJobsfalseAuto-remove failed jobs. Same format as removeOnComplete

Inside the processor, you have access to the job object:

const worker = new Worker('queue', async (job) => {
job.id; // Job ID
job.name; // Job name
job.data; // Job data
job.queueName; // Queue name
job.attemptsMade; // Current attempt number
job.timestamp; // When job was created
job.progress; // Current progress (0-100)
// Update progress
await job.updateProgress(50, 'Halfway done');
// Log messages
await job.log('Processing step 1');
return result;
}, { embedded: true });

All events are fully typed — TypeScript will autocomplete event names and infer callback parameter types.

worker.on('ready', () => {
console.log('Worker is ready');
});
worker.on('active', (job) => {
console.log(`Started: ${job.id}`);
});
worker.on('completed', (job, result) => {
console.log(`Completed: ${job.id}`, result);
});
worker.on('failed', (job, error) => {
console.error(`Failed: ${job.id}`, error.message);
});
worker.on('progress', (job, progress) => {
console.log(`Progress: ${job.id} - ${progress}%`);
});
worker.on('stalled', (jobId, reason) => {
console.warn(`Stalled: ${jobId} (${reason})`);
});
worker.on('drained', () => {
console.log('No more jobs in queue');
});
worker.on('error', (error) => {
console.error('Worker error:', error);
});
worker.on('cancelled', ({ jobId, reason }) => {
console.log(`Cancelled: ${jobId} - ${reason}`);
});
worker.on('closed', () => {
console.log('Worker closed');
});
EventCallback ParametersDescription
ready()Worker started polling
active(job: Job<T>)Job started processing
completed(job: Job<T>, result: R)Job completed successfully
failed(job: Job<T>, error: Error)Job processing failed
progress(job: Job<T> | null, progress: number)Job progress updated
stalled(jobId: string, reason: string)Job stalled (no heartbeat)
drained()Queue has no more waiting jobs
error(error: Error)Worker-level error
cancelled({ jobId: string, reason: string })Job was cancelled
closed()Worker shut down
// Start processing (if autorun: false)
worker.run();
// Pause processing
worker.pause();
// Resume processing
worker.resume();
// Stop the worker
await worker.close(); // Wait for active jobs
await worker.close(true); // Force close immediately

Workers automatically send heartbeats while processing jobs. This enables stall detection - if a job doesn’t receive a heartbeat within the configured interval, it’s considered stalled.

const worker = new Worker('queue', processor, {
embedded: true,
heartbeatInterval: 5000, // Send heartbeat every 5 seconds
});

Tip: The heartbeatInterval should be less than the queue’s stallInterval to avoid false positives.

See Stall Detection for more details.

When useLocks: true (default), workers use BullMQ-style lock tokens:

  • Each job gets a unique lock token when pulled
  • Workers must provide the token when acknowledging/failing jobs
  • Prevents job theft between workers
  • Lock is renewed via heartbeats
  • Heartbeats support a custom duration parameter to extend the lock for a specific TTL instead of using the default
const worker = new Worker('queue', processor, {
embedded: true,
useLocks: false, // Rely on stall detection only (embedded mode)
});

For better performance with many jobs, use batch pulling:

const worker = new Worker('queue', processor, {
embedded: true,
batchSize: 100, // Pull 100 jobs at once
pollTimeout: 5000, // Wait up to 5s for jobs (long polling)
});
const worker = new Worker('queue', async (job) => {
try {
await riskyOperation();
} catch (error) {
// Job will be retried if attempts remain
throw error;
}
}, { embedded: true });
// Handle at worker level
worker.on('failed', (job, error) => {
if (job.attemptsMade >= 3) {
// Final failure - alert someone
alertOps(job, error);
}
});

SandboxedWorker runs processors in isolated Bun Worker threads for crash isolation and memory protection. It is designed for CPU-intensive tasks or untrusted code that could crash the process.

import { SandboxedWorker } from 'bunqueue/client';
// Embedded mode (in-process)
const worker = new SandboxedWorker('cpu-intensive', {
processor: './processor.ts', // Path to processor file
concurrency: 4, // 4 parallel worker processes
timeout: 60000, // 60s timeout per job (default: 30000)
maxMemory: 256, // MB per worker (default: 256)
maxRestarts: 10, // Auto-restart limit (default: 10)
autoRestart: true, // Auto-restart crashed workers (default: true)
pollInterval: 10, // Job poll interval in ms (default: 10)
});
await worker.start();
OptionTypeDefaultDescription
processorstringPath to processor file (required)
concurrencynumber1Number of parallel worker threads
maxMemorynumber256Max memory per worker thread in MB
timeoutnumber30000Job processing timeout in ms
autoRestartbooleantrueAuto-restart crashed workers
maxRestartsnumber10Max restart attempts per worker
pollIntervalnumber10Job poll interval in ms
heartbeatIntervalnumber5000 (embedded) / 10000 (TCP)Heartbeat interval for stall detection / lock renewal
autoStartbooleanfalseAuto-restart worker pool when new jobs arrive after idle shutdown
autoStartPollMsnumber5000Poll interval for checking new jobs while idle-stopped
connectionConnectionOptionsTCP connection config (omit for embedded)

SandboxedWorker also supports TCP mode for connecting to a remote bunqueue server. Pass a connection option to enable it:

import { SandboxedWorker } from 'bunqueue/client';
// TCP mode - connects to bunqueue server
const worker = new SandboxedWorker('cpu-intensive', {
processor: './processor.ts',
concurrency: 4,
connection: {
host: 'localhost',
port: 6789,
token: 'my-auth-token', // Optional auth
},
heartbeatInterval: 10000, // Lock renewal interval (default: 10000 for TCP)
});
await worker.start();

Processor file (processor.ts):

export default async (job: {
id: string;
data: any;
queue: string;
attempts: number;
parentId?: string;
progress: (value: number) => void;
log: (message: string) => void;
fail: (error: string | Error) => void;
}) => {
job.log('Starting heavy computation');
job.progress(50);
const result = await heavyComputation(job.data);
job.progress(100);
job.log('Computation finished');
return result;
};
WorkerSandboxedWorker
Production ready✅ Stable, no experimental APIs⚠️ Depends on experimental Bun Workers
I/O-bound tasks (HTTP, DB, APIs)✅ Best choiceOverkill
CPU-intensive tasks⚠️ Blocks event loop✅ Runs in separate thread
Untrusted code❌ Runs in main thread✅ Isolated
Memory leak protection✅ Per-worker memory limit
Crash isolation✅ Only the worker thread dies
Events10 events8 events (no stalled, drained, cancelled)
Concurrency, retries, heartbeats✅ Same behavior

SandboxedWorker supports 8 events. Note that stalled, drained, and cancelled are not available — these are only on the regular Worker.

worker.on('ready', () => {
console.log('Worker pool is ready');
});
worker.on('active', (job) => {
console.log(`Job ${job.id} dispatched to worker process`);
});
worker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed:`, result);
});
worker.on('failed', (job, error) => {
console.error(`Job ${job.id} failed:`, error.message);
});
worker.on('progress', (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});
worker.on('log', (job, message) => {
console.log(`Job ${job.id} log: ${message}`);
});
worker.on('error', (error) => {
console.error('Worker error:', error);
});
worker.on('closed', () => {
console.log('Worker pool stopped');
});
EventCallback ParametersDescription
ready()Worker pool started and all threads spawned
active(job: Job)Job dispatched to a worker thread
completed(job: Job, result: unknown)Job completed successfully
failed(job: Job, error: Error)Job failed, timed out, or worker crashed
progress(job: Job, progress: number)Job progress updated (0-100)
log(job: Job, message: string)Log message from processor via job.log()
error(error: Error)Worker-level error (dispatch failure, heartbeat error, crash)
closed()Worker pool stopped
// Start the worker pool
worker.start();
// Get stats
const stats = worker.getStats();
// { total: 4, busy: 2, idle: 2, restarts: 0 }
// Graceful shutdown
await worker.stop();

When using embedded mode, call shutdownManager() to cleanly shut down the shared QueueManager (flushes write buffer, closes SQLite). In TCP mode, use closeSharedTcpClient() to close the shared TCP connection pool.

import { shutdownManager, closeSharedTcpClient } from 'bunqueue/client';
process.on('SIGINT', async () => {
await worker.close();
// Embedded mode: shut down the shared QueueManager
shutdownManager();
// TCP mode: close the shared TCP connection pool
closeSharedTcpClient();
process.exit(0);
});
FunctionModeDescription
shutdownManager()EmbeddedShuts down the shared QueueManager, flushes pending writes, closes SQLite
closeSharedTcpClient()TCPCloses the shared TCP client connection
closeAllSharedPools()TCPCloses all shared TCP connection pools

When processing CPU-heavy jobs over TCP, synchronous work can block the event loop and cause connection drops. See the dedicated CPU-Intensive Workers guide for connection tuning, yield patterns, and timeout reference.

import { Queue, Worker, shutdownManager } from 'bunqueue/client';
interface EmailJob {
to: string;
subject: string;
body: string;
}
const queue = new Queue<EmailJob>('emails', { embedded: true });
const worker = new Worker<EmailJob>('emails', async (job) => {
console.log(`Sending email to: ${job.data.to}`);
await job.updateProgress(50, 'Composing email...');
await job.log(`Subject: ${job.data.subject}`);
// Simulate sending
await Bun.sleep(100);
await job.updateProgress(100, 'Sent!');
return { sent: true, timestamp: Date.now() };
}, {
embedded: true,
concurrency: 5,
heartbeatInterval: 5000,
});
worker.on('completed', (job, result) => {
console.log(`✓ Email sent: ${job.id}`);
});
worker.on('failed', (job, error) => {
console.error(`✗ Email failed: ${job.id} - ${error.message}`);
});
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('Shutting down...');
await worker.close();
shutdownManager();
process.exit(0);
});