Queue API — Add Jobs, Priorities, Delays & Deduplication in bunqueue
The Queue class is used to add and manage jobs.
Creating a Queue
Section titled “Creating a Queue”import { Queue } from 'bunqueue/client';
// Basic queue - embedded modeconst queue = new Queue('my-queue', { embedded: true });
// Typed queueinterface TaskData { userId: number; action: string;}const typedQueue = new Queue<TaskData>('tasks', { embedded: true });
// With default job optionsconst queue = new Queue('emails', { embedded: true, defaultJobOptions: { attempts: 3, backoff: 1000, removeOnComplete: true, }});TCP Mode (Server)
Section titled “TCP Mode (Server)”// Connect to bunqueue server (no embedded option)const queue = new Queue('tasks');
// With custom connectionconst queue = new Queue('tasks', { connection: { host: '192.168.1.100', port: 6789, token: 'secret-token', poolSize: 4, // Connection pool size }});Adding Jobs
Section titled “Adding Jobs”Single Job
Section titled “Single Job”const job = await queue.add('job-name', { key: 'value' });
// With optionsconst job = await queue.add('job-name', data, { priority: 10, // Higher = processed first delay: 5000, // Delay in ms before processing attempts: 5, // Max retry attempts (default: 3) backoff: 2000, // Backoff between retries (default: 1000ms, jitter applied) backoffConfig: { // Advanced backoff configuration type: 'exponential', // 'fixed' or 'exponential' delay: 2000, // Base delay in ms }, timeout: 30000, // Job timeout in ms jobId: 'custom-id', // Custom job ID for deduplication (BullMQ-style) removeOnComplete: true, // Remove job data after completion removeOnFail: false, // Keep failed jobs stallTimeout: 60000, // Per-job stall timeout (overrides queue config)});Bulk Add
Section titled “Bulk Add”// Batch optimized - single lock, batch INSERTconst jobs = await queue.addBulk([ { name: 'task-1', data: { id: 1 } }, { name: 'task-2', data: { id: 2 }, opts: { priority: 10 } }, { name: 'task-3', data: { id: 3 }, opts: { delay: 5000 } },]);Repeatable Jobs
Section titled “Repeatable Jobs”// Repeat every 5 secondsawait queue.add('heartbeat', {}, { repeat: { every: 5000, }});
// Repeat with limitawait queue.add('daily-report', {}, { repeat: { every: 86400000, // 24 hours limit: 30, // Max 30 repetitions }});
// Cron pattern (server mode)await queue.add('weekly', {}, { repeat: { pattern: '0 9 * * MON', // Every Monday at 9am }});Updating Repeatable Job Data
Section titled “Updating Repeatable Job Data”You can update the data for the next repeat execution using updateData(). This works even after the current execution completes — the update propagates to the successor job automatically.
const job = await queue.add('sync', { endpoint: '/api/v1' }, { repeat: { every: 60000 },});
// Update data for the next executionawait job.updateData({ endpoint: '/api/v2' });// Next repeat will use { endpoint: '/api/v2' }Durable Jobs
Section titled “Durable Jobs”By default, bunqueue uses a write buffer for high throughput: jobs are batched in memory and flushed to SQLite every 10ms. This achieves ~100k jobs/sec but means jobs could be lost if the process crashes before the buffer is flushed.
For critical jobs where data loss is unacceptable, use the durable option:
// Critical job: immediate disk write, guaranteed persistenceawait queue.add('process-payment', { orderId: '123', amount: 99.99 }, { durable: true,});
// Batch of critical jobsawait queue.addBulk([ { name: 'payment-1', data: { orderId: '1' }, opts: { durable: true } }, { name: 'payment-2', data: { orderId: '2' }, opts: { durable: true } },]);Job Deduplication (BullMQ-style)
Section titled “Job Deduplication (BullMQ-style)”Use jobId to prevent duplicate jobs. When a job with the same jobId already exists, the existing job is returned instead of creating a duplicate. This works in both embedded and TCP modes (including auto-batched operations). This is BullMQ-compatible idempotent behavior.
// Basic deduplication with jobId (BullMQ-style idempotency)// If job with same jobId exists, returns existing job instead of creating duplicateconst job = await queue.add('send-email', { to: 'user@test.com' }, { jobId: 'email-user-123'});
// First call: creates the jobconst job1 = await queue.add('process', { orderId: 123 }, { jobId: 'order-123'});
// Second call with same jobId: returns existing job (no duplicate)const job2 = await queue.add('process', { orderId: 123 }, { jobId: 'order-123'});
console.log(job1.id === job2.id); // true - same job returned// Example: Restore jobs on service startupasync function restoreJobs(jobsToRestore: SavedJob[]) { for (const saved of jobsToRestore) { // Safe: existing jobs are returned, not duplicated await queue.add('process', saved.data, { jobId: saved.id }); }}Advanced Deduplication
Section titled “Advanced Deduplication”For more control over deduplication behavior, use the deduplication option with TTL-based unique keys and strategies.
TTL-Based Deduplication
Section titled “TTL-Based Deduplication”While jobId provides permanent idempotency (via customId), the deduplication option uses a separate uniqueKey mechanism with TTL-based expiry. The id field is required:
// TTL-based deduplication - unique key expires after 1 hourawait queue.add('notification', { userId: '123' }, { deduplication: { id: 'notify-123', // Required: unique deduplication key ttl: 3600000 // 1 hour in ms }});
// After TTL expires, the same id can create a new job// This is useful for rate-limiting or time-windowed deduplicationExtend Strategy
Section titled “Extend Strategy”The extend strategy resets the TTL of an existing job when a duplicate is detected. The existing job is returned (not replaced), but its deduplication window is extended:
// Extend strategy - reset TTL if duplicate, return existing jobawait queue.add('rate-limited-task', { action: 'sync' }, { deduplication: { id: 'sync-task', // Required: unique deduplication key ttl: 60000, extend: true // Extend TTL on duplicate }});Replace Strategy
Section titled “Replace Strategy”The replace strategy removes the existing job and inserts a new one with the updated data. This is useful when you always want the latest data to be processed:
// Replace strategy - remove old job, insert new oneawait queue.add('latest-data', { data: newData }, { deduplication: { id: 'data-job', // Required: unique deduplication key ttl: 300000, replace: true // Replace existing job with new data }});Deduplication Options Reference
Section titled “Deduplication Options Reference”| Option | Type | Default | Description |
|---|---|---|---|
ttl | number | - | Time in ms before unique key expires |
extend | boolean | false | Reset TTL on duplicate (returns existing job) |
replace | boolean | false | Remove old job and create new one |
Query Operations
Section titled “Query Operations”// Get job by IDconst job = await queue.getJob('job-id');
// Get job stateconst state = await queue.getJobState('job-id');// 'waiting' | 'active' | 'completed' | 'failed' | 'delayed'
// Get job counts (sync - embedded mode only)const counts = queue.getJobCounts();// { waiting: 10, active: 2, completed: 100, failed: 3 }
// Get job counts (async - works with TCP)const counts = await queue.getJobCountsAsync();
// Get jobs with filtering (sync - embedded mode only)const jobs = queue.getJobs({ state: 'waiting', start: 0, end: 10 });
// Get jobs with filtering (async - works with TCP)const jobs = await queue.getJobsAsync({ state: 'failed', start: 0, end: 50 });
// Get counts grouped by priorityconst byPriority = queue.getCountsPerPriority();// { 0: 50, 10: 20, 100: 5 }
// Async versionconst byPriority = await queue.getCountsPerPriorityAsync();Jobs by State
Section titled “Jobs by State”// Sync (embedded mode only)const waiting = queue.getWaiting(0, 10);const active = queue.getActive(0, 10);const completed = queue.getCompleted(0, 10);const failed = queue.getFailed(0, 10);const delayed = queue.getDelayed(0, 10);
// Async (works with TCP)const waiting = await queue.getWaitingAsync(0, 10);const active = await queue.getActiveAsync(0, 10);const completed = await queue.getCompletedAsync(0, 10);const failed = await queue.getFailedAsync(0, 10);const delayed = await queue.getDelayedAsync(0, 10);Count Methods
Section titled “Count Methods”// Sync (embedded mode only)const waitingCount = queue.getWaitingCount();const activeCount = queue.getActiveCount();const completedCount = queue.getCompletedCount();const failedCount = queue.getFailedCount();const delayedCount = queue.getDelayedCount();const total = queue.count();
// Async (works with TCP)const total = await queue.countAsync();
// Check if pausedconst paused = queue.isPaused(); // syncconst paused = await queue.isPausedAsync(); // asyncBullMQ Compatibility Methods
Section titled “BullMQ Compatibility Methods”// Get waiting jobs sorted by priority (highest first)const prioritized = await queue.getPrioritized(0, 10);const count = await queue.getPrioritizedCount();
// Get jobs waiting for children to completeconst waitingChildren = await queue.getWaitingChildren(0, 10);const count = await queue.getWaitingChildrenCount();Queue Control
Section titled “Queue Control”// Pause processing (workers stop pulling)queue.pause();
// Resume processingqueue.resume();
// Remove all waiting jobsqueue.drain();
// Remove all queue dataqueue.obliterate();
// Remove a specific jobqueue.remove('job-id');
// Wait until queue/server is readyawait queue.waitUntilReady();
// Close TCP connection (when done)queue.close();
// Async disconnect (compatibility)await queue.disconnect();Clean & Maintenance
Section titled “Clean & Maintenance”// Remove completed jobs older than 1 hour (sync)queue.clean(3600000, 100, 'completed');
// Async version (works with TCP)const removed = await queue.cleanAsync(3600000, 100, 'completed');
// Promote delayed jobs to waitingqueue.promoteJobs({ count: 50 });
// Bulk retry failed or completed jobsconst retried = await queue.retryJobs({ state: 'failed', count: 100 });Job Progress & Logs
Section titled “Job Progress & Logs”// Update job progressawait queue.updateJobProgress('job-id', 75);
// Get job logsconst logs = queue.getJobLogs('job-id', 0, 100);
// Add log entry to a jobawait queue.addJobLog('job-id', 'Processing step 3 completed');Job Dependencies
Section titled “Job Dependencies”// Get child job resultsconst childValues = await queue.getChildrenValues('parent-job-id');
// Get job dependencies infoconst deps = await queue.getJobDependencies('job-id');const depCounts = await queue.getJobDependenciesCount('job-id');
// Get child jobs with filterconst processed = await queue.getDependencies('parent-id', 'processed', 0, 10);const unprocessed = await queue.getDependencies('parent-id', 'unprocessed', 0, 10);
// Wait for a job to finishconst result = await queue.waitJobUntilFinished('job-id', queueEvents, 30000);Job State Transitions
Section titled “Job State Transitions”// Move job to completed with return valueawait queue.moveJobToCompleted('job-id', { success: true }, token);
// Move job to failed with errorawait queue.moveJobToFailed('job-id', new Error('reason'), token);
// Move job back to waitingawait queue.moveJobToWait('job-id', token);
// Move job to delayed with specific timestampawait queue.moveJobToDelayed('job-id', Date.now() + 60000, token);
// Move job to waiting-for-children stateawait queue.moveJobToWaitingChildren('job-id', token);Rate Limiting & Concurrency
Section titled “Rate Limiting & Concurrency”// Set global concurrency limit (max parallel jobs across all workers)queue.setGlobalConcurrency(10);const concurrency = await queue.getGlobalConcurrency();queue.removeGlobalConcurrency();
// Set global rate limit (max jobs per time window)queue.setGlobalRateLimit(100, 1000); // 100 jobs per secondconst rateLimit = await queue.getGlobalRateLimit();queue.removeGlobalRateLimit();
// Throttle queue for specified durationawait queue.rateLimit(5000); // pause for 5 seconds
// Check remaining throttle timeconst ttl = await queue.getRateLimitTtl();
// Check if queue hit rate/concurrency limitconst maxed = await queue.isMaxed();Job Schedulers (Repeatable Jobs)
Section titled “Job Schedulers (Repeatable Jobs)”// Create or update a job schedulerawait queue.upsertJobScheduler('daily-report', { pattern: '0 9 * * *', // cron pattern // or: every: 3600000, // interval in ms}, { name: 'generate-report', data: { type: 'daily' },});
// Get a schedulerconst scheduler = await queue.getJobScheduler('daily-report');
// List all schedulersconst schedulers = await queue.getJobSchedulers(0, 100);const count = await queue.getJobSchedulersCount();
// Remove a schedulerawait queue.removeJobScheduler('daily-report');Deduplication Management
Section titled “Deduplication Management”// Look up job ID by deduplication keyconst jobId = await queue.getDeduplicationJobId('my-unique-key');
// Remove deduplication key (allows re-adding same jobId)await queue.removeDeduplicationKey('my-unique-key');Workers & Metrics
Section titled “Workers & Metrics”// List active workersconst workers = await queue.getWorkers();const count = await queue.getWorkersCount();
// Get historical metricsconst completedMetrics = await queue.getMetrics('completed', 0, 100);const failedMetrics = await queue.getMetrics('failed', 0, 100);
// Trim event logawait queue.trimEvents(1000);Stall Configuration
Section titled “Stall Configuration”Configure stall detection to recover unresponsive jobs.
queue.setStallConfig({ enabled: true, stallInterval: 30000, // 30 seconds without heartbeat = stalled maxStalls: 3, // Move to DLQ after 3 stalls gracePeriod: 5000, // 5 second grace period after job starts});
// Get current configconst config = queue.getStallConfig();See Stall Detection for more details.
DLQ Operations
Section titled “DLQ Operations”// Configure DLQqueue.setDlqConfig({ autoRetry: true, autoRetryInterval: 3600000, // 1 hour maxAutoRetries: 3, maxAge: 604800000, // 7 days maxEntries: 10000,});
// Get current DLQ configconst dlqConfig = queue.getDlqConfig();
// Get DLQ entriesconst entries = queue.getDlq();
// Filter entriesconst stalledJobs = queue.getDlq({ reason: 'stalled' });const recentFails = queue.getDlq({ newerThan: Date.now() - 3600000 });
// Get statsconst stats = queue.getDlqStats();// { total, byReason, pendingRetry, expired, oldestEntry, newestEntry }
// Retry from DLQqueue.retryDlq(); // Retry allqueue.retryDlq('job-123'); // Retry specific
// Retry by filterqueue.retryDlqByFilter({ reason: 'timeout', limit: 10 });
// Purge DLQqueue.purgeDlq();See Dead Letter Queue for more details.
Retry Completed Jobs
Section titled “Retry Completed Jobs”The retryCompleted() method allows re-queuing completed jobs for reprocessing. This is useful when you need to re-run a job that completed successfully, for example when business logic changes or you need to regenerate outputs.
// Retry a specific completed jobconst success = queue.retryCompleted('job-id-123');if (success) { console.log('Job re-queued for processing');}
// Retry all completed jobs (use with caution!)const count = queue.retryCompleted();console.log(`Re-queued ${count} completed jobs`);
// Async version for TCP modeconst count = await queue.retryCompletedAsync();Auto-Batching (TCP Mode)
Section titled “Auto-Batching (TCP Mode)”In TCP mode, queue.add() calls are automatically batched into PUSHB (bulk push) commands for higher throughput. This is enabled by default and requires no code changes.
How it works: If no flush is in-flight, the add is sent immediately (zero overhead for sequential await). If a flush is already in-flight, subsequent adds are buffered and sent together when the current flush completes or after maxDelayMs, whichever comes first.
// Auto-batching is enabled by default in TCP modeconst queue = new Queue('tasks');
// Sequential: no penalty, each add() sends immediatelyfor (const item of items) { await queue.add('task', item);}
// Concurrent: adds batch into a single PUSHB round-tripawait Promise.all([ queue.add('a', { x: 1 }), queue.add('b', { x: 2 }), queue.add('c', { x: 3 }),]);Configuration
Section titled “Configuration”const queue = new Queue('tasks', { autoBatch: { maxSize: 100, // Flush when buffer reaches this size (default: 50) maxDelayMs: 10, // Max ms to wait before flushing (default: 5) },});Disabling Auto-Batching
Section titled “Disabling Auto-Batching”const queue = new Queue('tasks', { autoBatch: { enabled: false },});Auto-Batch Options Reference
Section titled “Auto-Batch Options Reference”| Option | Type | Default | Description |
|---|---|---|---|
enabled | boolean | true | Enable or disable auto-batching |
maxSize | number | 50 | Max items before auto-flush |
maxDelayMs | number | 5 | Max delay in ms before auto-flush |
Job Options Reference
Section titled “Job Options Reference”| Option | Type | Default | Description |
|---|---|---|---|
priority | number | 0 | Higher = processed first |
delay | number | 0 | Delay in ms before processing |
attempts | number | 3 | Max retry attempts |
backoff | number | 1000 | Backoff base in ms (exponential, jitter applied) |
backoffConfig | object | - | Advanced backoff: { type, delay } |
timeout | number | - | Processing timeout in ms |
jobId | string | - | Custom ID for deduplication (BullMQ-style idempotent) |
deduplication | object | - | Advanced deduplication config (ttl, extend, replace) |
removeOnComplete | boolean | false | Auto-delete after completion |
removeOnFail | boolean | false | Auto-delete after failure |
stallTimeout | number | - | Per-job stall timeout override |
repeat | object | - | Repeating job config |
durable | boolean | false | Immediate disk write (bypass buffer) |
Closing
Section titled “Closing”// Close TCP connection (no-op in embedded mode)queue.close();