Building Reliable Workers with Stall Detection
A job queue is only as reliable as its workers. If a worker crashes mid-processing, what happens to the job? Without proper safeguards, it’s either lost forever or stuck in an “active” state. bunqueue solves this with stall detection and lock-based ownership.
The Problem: Zombie Jobs
Consider this scenario:
- Worker pulls job #42
- Worker starts processing
- Worker process crashes (OOM, unhandled exception, hardware failure)
- Job #42 is stuck in “active” state forever
No error handler fires. No retry logic runs. The job simply disappears into the void.
Solution: Heartbeat + Lock Protocol
bunqueue workers send periodic heartbeats to prove they’re alive. If a heartbeat is missed, the job is considered “stalled” and returned to the queue.
const worker = new Worker('emails', processor, { embedded: true, heartbeatInterval: 10_000, // Send heartbeat every 10 seconds useLocks: true, // Enable lock-based ownership lockDuration: 30_000, // Lock expires after 30 seconds});The flow:
- Worker pulls a job and acquires a lock with a unique token
- Every
heartbeatIntervalms, the worker renews the lock - If the lock expires (worker died), the stall detector reclaims the job
- The job is returned to the queue with
stallCount++
Configuring Stall Detection
In embedded mode, you can fine-tune stall behavior per queue:
const queue = new Queue('payments', { embedded: true });
queue.setStallConfig({ enabled: true, stallInterval: 30_000, // Check for stalls every 30 seconds maxStalls: 3, // After 3 stalls, move to DLQ gracePeriod: 5_000, // New jobs get 5s grace before stall check});The maxStalls parameter is critical. If a job consistently stalls (perhaps due to a bug that causes OOM), it gets moved to the Dead Letter Queue after maxStalls attempts instead of retrying forever.
Lock Ownership: Preventing Double Processing
Locks prevent a subtle but dangerous bug: double processing. Without locks, this can happen:
- Worker A pulls job #42
- Worker A’s heartbeat is delayed (GC pause, network hiccup)
- Stall detector thinks the job is stalled, returns it to the queue
- Worker B pulls job #42 and starts processing
- Worker A recovers from GC pause and continues processing job #42
- Both workers are processing the same job
With locks, Worker A’s ack/fail is rejected because its lock token is no longer valid:
// Worker A tries to ack after its lock expired// Server verifies the lock token// Token mismatch -> ACK rejected// Worker B's token is current -> Worker B's ACK succeedsHeartbeat Implementation
The heartbeat runs as a timer inside the worker:
const worker = new Worker('tasks', async (job) => { // Long-running operation for (let i = 0; i < 100; i++) { await processChunk(i); await job.updateProgress(i); // Heartbeat runs automatically in the background } return { processed: true };}, { heartbeatInterval: 10_000, concurrency: 5,});You don’t need to manually send heartbeats. The worker handles it automatically for all active jobs. In TCP mode, heartbeats are batched into a single JobHeartbeatB command when multiple jobs are active.
Graceful Shutdown
When stopping a worker, you want to finish current jobs rather than abandoning them:
// Graceful: waits for active jobs to completeawait worker.close();
// Force: immediately stops (jobs will be reclaimed by stall detector)await worker.close(true);The graceful shutdown:
- Stops polling for new jobs
- Waits for in-flight jobs to complete
- Flushes the ACK batcher (sends pending acks)
- Closes the TCP connection
// Handle process signals for clean shutdownprocess.on('SIGTERM', async () => { console.log('Shutting down worker...'); await worker.close(); // Graceful process.exit(0);});Handling Stalled Events
Workers emit events when stalls are detected:
worker.on('stalled', (jobId, prev) => { console.warn(`Job ${jobId} was stalled (previous state: ${prev})`); // Alert your monitoring system});
worker.on('failed', (job, err) => { if (err.message.includes('Lock expired')) { console.warn(`Job ${job.id} lost its lock - was reclaimed`); }});Best Practices
- Set
heartbeatIntervalto 1/3 oflockDuration- this gives 2 retries before lock expiry - Use
maxStalls: 3for production - prevents infinite retry loops - Always use graceful shutdown -
worker.close()withoutforce: true - Monitor stall events - frequent stalls indicate worker health issues
- Set appropriate timeouts on your processor function - don’t let jobs run forever