Queue API
The Queue class is used to add and manage jobs.
Creating a Queue
import { Queue } from 'bunqueue/client';
// Basic queue - embedded modeconst queue = new Queue('my-queue', { embedded: true });
// Typed queueinterface TaskData { userId: number; action: string;}const typedQueue = new Queue<TaskData>('tasks', { embedded: true });
// With default job optionsconst queue = new Queue('emails', { embedded: true, defaultJobOptions: { attempts: 3, backoff: 1000, removeOnComplete: true, }});TCP Mode (Server)
// Connect to bunqueue server (no embedded option)const queue = new Queue('tasks');
// With custom connectionconst queue = new Queue('tasks', { connection: { host: '192.168.1.100', port: 6789, token: 'secret-token', poolSize: 4, // Connection pool size }});Adding Jobs
Single Job
const job = await queue.add('job-name', { key: 'value' });
// With optionsconst job = await queue.add('job-name', data, { priority: 10, // Higher = processed first delay: 5000, // Delay in ms before processing attempts: 5, // Max retry attempts (default: 3) backoff: 2000, // Backoff between retries (default: 1000ms) timeout: 30000, // Job timeout in ms jobId: 'custom-id', // Custom job ID for deduplication (BullMQ-style) removeOnComplete: true, // Remove job data after completion removeOnFail: false, // Keep failed jobs stallTimeout: 60000, // Per-job stall timeout (overrides queue config)});Bulk Add
// Batch optimized - single lock, batch INSERTconst jobs = await queue.addBulk([ { name: 'task-1', data: { id: 1 } }, { name: 'task-2', data: { id: 2 }, opts: { priority: 10 } }, { name: 'task-3', data: { id: 3 }, opts: { delay: 5000 } },]);Repeatable Jobs
// Repeat every 5 secondsawait queue.add('heartbeat', {}, { repeat: { every: 5000, }});
// Repeat with limitawait queue.add('daily-report', {}, { repeat: { every: 86400000, // 24 hours limit: 30, // Max 30 repetitions }});
// Cron pattern (server mode)await queue.add('weekly', {}, { repeat: { pattern: '0 9 * * MON', // Every Monday at 9am }});Durable Jobs
By default, bunqueue uses a write buffer for high throughput: jobs are batched in memory and flushed to SQLite every 10ms. This achieves ~100k jobs/sec but means jobs could be lost if the process crashes before the buffer is flushed.
For critical jobs where data loss is unacceptable, use the durable option:
// Critical job: immediate disk write, guaranteed persistenceawait queue.add('process-payment', { orderId: '123', amount: 99.99 }, { durable: true,});
// Batch of critical jobsawait queue.addBulk([ { name: 'payment-1', data: { orderId: '1' }, opts: { durable: true } }, { name: 'payment-2', data: { orderId: '2' }, opts: { durable: true } },]);Job Deduplication (BullMQ-style)
Use jobId to prevent duplicate jobs. When a job with the same jobId already exists, the existing job is returned instead of creating a duplicate. This is BullMQ-compatible idempotent behavior.
// Basic deduplication with jobId (BullMQ-style idempotency)// If job with same jobId exists, returns existing job instead of creating duplicateconst job = await queue.add('send-email', { to: 'user@test.com' }, { jobId: 'email-user-123'});
// First call: creates the jobconst job1 = await queue.add('process', { orderId: 123 }, { jobId: 'order-123'});
// Second call with same jobId: returns existing job (no duplicate)const job2 = await queue.add('process', { orderId: 123 }, { jobId: 'order-123'});
console.log(job1.id === job2.id); // true - same job returned// Example: Restore jobs on service startupasync function restoreJobs(jobsToRestore: SavedJob[]) { for (const saved of jobsToRestore) { // Safe: existing jobs are returned, not duplicated await queue.add('process', saved.data, { jobId: saved.id }); }}Advanced Deduplication
For more control over deduplication behavior, use the deduplication option with TTL-based unique keys and strategies.
TTL-Based Deduplication
By default, jobId deduplication is permanent (until the job is completed or removed). Use deduplication.ttl to make the unique key expire after a specified time:
// TTL-based deduplication - unique key expires after 1 hourawait queue.add('notification', { userId: '123' }, { jobId: 'notify-123', deduplication: { ttl: 3600000 // 1 hour in ms }});
// After TTL expires, the same jobId can create a new job// This is useful for rate-limiting or time-windowed deduplicationExtend Strategy
The extend strategy resets the TTL of an existing job when a duplicate is detected. The existing job is returned (not replaced), but its deduplication window is extended:
// Extend strategy - reset TTL if duplicate, return existing jobawait queue.add('rate-limited-task', { action: 'sync' }, { jobId: 'sync-task', deduplication: { ttl: 60000, extend: true // Extend TTL on duplicate }});Replace Strategy
The replace strategy removes the existing job and inserts a new one with the updated data. This is useful when you always want the latest data to be processed:
// Replace strategy - remove old job, insert new oneawait queue.add('latest-data', { data: newData }, { jobId: 'data-job', deduplication: { ttl: 300000, replace: true // Replace existing job with new data }});Deduplication Options Reference
| Option | Type | Default | Description |
|---|---|---|---|
ttl | number | - | Time in ms before unique key expires |
extend | boolean | false | Reset TTL on duplicate (returns existing job) |
replace | boolean | false | Remove old job and create new one |
Query Operations
// Get job by IDconst job = await queue.getJob('job-id');
// Get job stateconst state = await queue.getJobState('job-id');// 'waiting' | 'active' | 'completed' | 'failed' | 'delayed'
// Get job counts (sync - embedded mode only)const counts = queue.getJobCounts();// { waiting: 10, active: 2, completed: 100, failed: 3 }
// Get job counts (async - works with TCP)const counts = await queue.getJobCountsAsync();
// Get jobs with filtering (sync - embedded mode only)const jobs = queue.getJobs({ state: 'waiting', start: 0, end: 10 });
// Get jobs with filtering (async - works with TCP)const jobs = await queue.getJobsAsync({ state: 'failed', start: 0, end: 50 });
// Get counts grouped by priorityconst byPriority = queue.getCountsPerPriority();// { 0: 50, 10: 20, 100: 5 }
// Async versionconst byPriority = await queue.getCountsPerPriorityAsync();Jobs by State
// Sync (embedded mode only)const waiting = queue.getWaiting(0, 10);const active = queue.getActive(0, 10);const completed = queue.getCompleted(0, 10);const failed = queue.getFailed(0, 10);const delayed = queue.getDelayed(0, 10);
// Async (works with TCP)const waiting = await queue.getWaitingAsync(0, 10);const active = await queue.getActiveAsync(0, 10);const completed = await queue.getCompletedAsync(0, 10);const failed = await queue.getFailedAsync(0, 10);const delayed = await queue.getDelayedAsync(0, 10);Count Methods
// Sync (embedded mode only)const waitingCount = queue.getWaitingCount();const activeCount = queue.getActiveCount();const completedCount = queue.getCompletedCount();const failedCount = queue.getFailedCount();const delayedCount = queue.getDelayedCount();const total = queue.count();
// Async (works with TCP)const total = await queue.countAsync();
// Check if pausedconst paused = queue.isPaused(); // syncconst paused = await queue.isPausedAsync(); // asyncBullMQ Compatibility Methods
// Get waiting jobs sorted by priority (highest first)const prioritized = await queue.getPrioritized(0, 10);const count = await queue.getPrioritizedCount();
// Get jobs waiting for children to completeconst waitingChildren = await queue.getWaitingChildren(0, 10);const count = await queue.getWaitingChildrenCount();Queue Control
// Pause processing (workers stop pulling)queue.pause();
// Resume processingqueue.resume();
// Remove all waiting jobsqueue.drain();
// Remove all queue dataqueue.obliterate();
// Remove a specific jobqueue.remove('job-id');
// Wait until queue/server is readyawait queue.waitUntilReady();
// Close TCP connection (when done)queue.close();
// Async disconnect (compatibility)await queue.disconnect();Clean & Maintenance
// Remove completed jobs older than 1 hour (sync)queue.clean(3600000, 100, 'completed');
// Async version (works with TCP)const removed = await queue.cleanAsync(3600000, 100, 'completed');
// Promote delayed jobs to waitingqueue.promoteJobs({ count: 50 });
// Bulk retry failed or completed jobsconst retried = await queue.retryJobs({ state: 'failed', count: 100 });Job Progress & Logs
// Update job progressawait queue.updateJobProgress('job-id', 75);
// Get job logsconst logs = queue.getJobLogs('job-id', 0, 100);
// Add log entry to a jobawait queue.addJobLog('job-id', 'Processing step 3 completed');Job Dependencies
// Get child job resultsconst childValues = await queue.getChildrenValues('parent-job-id');
// Get job dependencies infoconst deps = await queue.getJobDependencies('job-id');const depCounts = await queue.getJobDependenciesCount('job-id');
// Get child jobs with filterconst processed = await queue.getDependencies('parent-id', 'processed', 0, 10);const unprocessed = await queue.getDependencies('parent-id', 'unprocessed', 0, 10);
// Wait for a job to finishconst result = await queue.waitJobUntilFinished('job-id', queueEvents, 30000);Job State Transitions
// Move job to completed with return valueawait queue.moveJobToCompleted('job-id', { success: true }, token);
// Move job to failed with errorawait queue.moveJobToFailed('job-id', new Error('reason'), token);
// Move job back to waitingawait queue.moveJobToWait('job-id', token);
// Move job to delayed with specific timestampawait queue.moveJobToDelayed('job-id', Date.now() + 60000, token);
// Move job to waiting-for-children stateawait queue.moveJobToWaitingChildren('job-id', token);Rate Limiting & Concurrency
// Set global concurrency limit (max parallel jobs across all workers)queue.setGlobalConcurrency(10);const concurrency = await queue.getGlobalConcurrency();queue.removeGlobalConcurrency();
// Set global rate limit (max jobs per time window)queue.setGlobalRateLimit(100, 1000); // 100 jobs per secondconst rateLimit = await queue.getGlobalRateLimit();queue.removeGlobalRateLimit();
// Throttle queue for specified durationawait queue.rateLimit(5000); // pause for 5 seconds
// Check remaining throttle timeconst ttl = await queue.getRateLimitTtl();
// Check if queue hit rate/concurrency limitconst maxed = await queue.isMaxed();Job Schedulers (Repeatable Jobs)
// Create or update a job schedulerawait queue.upsertJobScheduler('daily-report', { pattern: '0 9 * * *', // cron pattern // or: every: 3600000, // interval in ms}, { name: 'generate-report', data: { type: 'daily' },});
// Get a schedulerconst scheduler = await queue.getJobScheduler('daily-report');
// List all schedulersconst schedulers = await queue.getJobSchedulers(0, 100);const count = await queue.getJobSchedulersCount();
// Remove a schedulerawait queue.removeJobScheduler('daily-report');Deduplication Management
// Look up job ID by deduplication keyconst jobId = await queue.getDeduplicationJobId('my-unique-key');
// Remove deduplication key (allows re-adding same jobId)await queue.removeDeduplicationKey('my-unique-key');Workers & Metrics
// List active workersconst workers = await queue.getWorkers();const count = await queue.getWorkersCount();
// Get historical metricsconst completedMetrics = await queue.getMetrics('completed', 0, 100);const failedMetrics = await queue.getMetrics('failed', 0, 100);
// Trim event logawait queue.trimEvents(1000);Stall Configuration
Configure stall detection to recover unresponsive jobs.
queue.setStallConfig({ enabled: true, stallInterval: 30000, // 30 seconds without heartbeat = stalled maxStalls: 3, // Move to DLQ after 3 stalls gracePeriod: 5000, // 5 second grace period after job starts});
// Get current configconst config = queue.getStallConfig();See Stall Detection for more details.
DLQ Operations
// Configure DLQqueue.setDlqConfig({ autoRetry: true, autoRetryInterval: 3600000, // 1 hour maxAutoRetries: 3, maxAge: 604800000, // 7 days maxEntries: 10000,});
// Get current DLQ configconst dlqConfig = queue.getDlqConfig();
// Get DLQ entriesconst entries = queue.getDlq();
// Filter entriesconst stalledJobs = queue.getDlq({ reason: 'stalled' });const recentFails = queue.getDlq({ newerThan: Date.now() - 3600000 });
// Get statsconst stats = queue.getDlqStats();// { total, byReason, pendingRetry, expired, oldestEntry, newestEntry }
// Retry from DLQqueue.retryDlq(); // Retry allqueue.retryDlq('job-123'); // Retry specific
// Retry by filterqueue.retryDlqByFilter({ reason: 'timeout', limit: 10 });
// Purge DLQqueue.purgeDlq();See Dead Letter Queue for more details.
Retry Completed Jobs
The retryCompleted() method allows re-queuing completed jobs for reprocessing. This is useful when you need to re-run a job that completed successfully, for example when business logic changes or you need to regenerate outputs.
// Retry a specific completed jobconst success = queue.retryCompleted('job-id-123');if (success) { console.log('Job re-queued for processing');}
// Retry all completed jobs (use with caution!)const count = queue.retryCompleted();console.log(`Re-queued ${count} completed jobs`);
// Async version for TCP modeconst count = await queue.retryCompletedAsync();Job Options Reference
| Option | Type | Default | Description |
|---|---|---|---|
priority | number | 0 | Higher = processed first |
delay | number | 0 | Delay in ms before processing |
attempts | number | 3 | Max retry attempts |
backoff | number | 1000 | Backoff base in ms (exponential) |
timeout | number | - | Processing timeout in ms |
jobId | string | - | Custom ID for deduplication (BullMQ-style idempotent) |
deduplication | object | - | Advanced deduplication config (ttl, extend, replace) |
removeOnComplete | boolean | false | Auto-delete after completion |
removeOnFail | boolean | false | Auto-delete after failure |
stallTimeout | number | - | Per-job stall timeout override |
repeat | object | - | Repeating job config |
durable | boolean | false | Immediate disk write (bypass buffer) |
Closing
// Close TCP connection (no-op in embedded mode)queue.close();