bunqueue TCP Protocol Reference: Binary MessagePack Commands
High-performance binary protocol on port 6789 (default). All messages use MessagePack encoding with length-prefixed framing. Supports pipelining for concurrent command processing.
Wire Format
Section titled “Wire Format”Every message (request and response) is wrapped in a length-prefixed frame:
┌──────────────────────┬──────────────────────────────┐│ 4 bytes (Big-Endian │ N bytes (MessagePack payload)││ unsigned 32-bit │ ││ payload length) │ │└──────────────────────┴──────────────────────────────┘The framing protocol works as follows:
- The first 4 bytes are a big-endian unsigned 32-bit integer indicating the length of the MessagePack payload.
- The next N bytes are the MessagePack-encoded command or response object.
- Maximum frame size is 64 MB. Frames exceeding this limit cause the connection to be terminated.
Encoding Example
Section titled “Encoding Example”import { pack, unpack } from 'msgpackr';
// Encode a command into a framed messagefunction frameCommand(cmd: object): Uint8Array { const payload = pack(cmd); const frame = new Uint8Array(4 + payload.length); // Write length prefix (big-endian u32) frame[0] = (payload.length >> 24) & 0xff; frame[1] = (payload.length >> 16) & 0xff; frame[2] = (payload.length >> 8) & 0xff; frame[3] = payload.length & 0xff; frame.set(payload, 4); return frame;}
// Decode a framed responsefunction decodeFrame(frame: Uint8Array): object { return unpack(frame);}Connection
Section titled “Connection”import { pack, unpack } from 'msgpackr';
const socket = await Bun.connect({ hostname: 'localhost', port: 6789, socket: { data(socket, data) { // Parse frames from data, then unpack each frame with msgpackr }, },});
// Send a commandconst cmd = pack({ cmd: 'Ping' });const frame = new Uint8Array(4 + cmd.length);frame[0] = (cmd.length >> 24) & 0xff;frame[1] = (cmd.length >> 16) & 0xff;frame[2] = (cmd.length >> 8) & 0xff;frame[3] = cmd.length & 0xff;frame.set(cmd, 4);socket.write(frame);Protocol Negotiation (Hello)
Section titled “Protocol Negotiation (Hello)”Clients should send a Hello command after connecting to negotiate protocol version and discover server capabilities.
Request:
{ cmd: 'Hello', protocolVersion: 2, capabilities: ['pipelining'] }Response:
{ ok: true, protocolVersion: 2, capabilities: ['pipelining'], server: 'bunqueue', version: '2.1.8' // Server version string}The current protocol version is 2. The only supported capability is pipelining.
Pipelining
Section titled “Pipelining”The server supports pipelining: clients can send multiple commands without waiting for each response. The server processes frames in parallel with a concurrency limit of 50 commands per connection, controlled by a semaphore.
To correlate responses with requests when pipelining, include a reqId field in each command. The server echoes reqId back in the corresponding response.
// Send two commands simultaneouslysocket.write(frameCommand({ cmd: 'PUSH', queue: 'emails', data: { to: 'a@b.com' }, reqId: '1' }));socket.write(frameCommand({ cmd: 'PUSH', queue: 'emails', data: { to: 'c@d.com' }, reqId: '2' }));
// Responses may arrive in any order - match by reqId// { ok: true, id: 'abc-123', reqId: '1' }// { ok: true, id: 'def-456', reqId: '2' }Authentication
Section titled “Authentication”When the server is configured with AUTH_TOKENS, all connections must authenticate before sending other commands. The Auth command is always permitted regardless of authentication state.
Request:
{ cmd: 'Auth', token: 'your-secret-token' }Response (success):
{ ok: true }Response (failure):
{ ok: false, error: 'Invalid token' }If auth tokens are configured and a client sends any command before authenticating, the server responds with:
{ ok: false, error: 'Not authenticated' }Response Format
Section titled “Response Format”All responses include an ok boolean field. On success ok is true with command-specific data. On failure ok is false with an error string.
// Success{ ok: true, ...data, reqId?: string }
// Error{ ok: false, error: 'Error message', reqId?: string }Connection Lifecycle
Section titled “Connection Lifecycle”When a TCP connection closes, the server automatically releases all jobs that were being processed by that client back to their queues. This uses retry logic with exponential backoff (up to 3 attempts) to ensure jobs are not left in an inconsistent state.
Rate Limiting
Section titled “Rate Limiting”Each connection is subject to server-side rate limiting. If exceeded, the server responds with:
{ ok: false, error: 'Rate limit exceeded' }Command Reference
Section titled “Command Reference”Every command object must include a cmd field. An optional reqId field can be included for request-response correlation (required for pipelining).
Core Commands
Section titled “Core Commands”Add a single job to a queue.
Request:
{ cmd: 'PUSH', queue: string, // Queue name (required, max 256 chars, alphanumeric/underscore/dash/dot/colon) data: any, // Job payload (required, max 10 MB) priority?: number, // Higher = processed sooner (default: 0, range: -1000000 to 1000000) delay?: number, // Delay in ms before processing (default: 0, max: 1 year) maxAttempts?: number, // Max retry attempts (default: 3, range: 1-1000) backoff?: number, // Retry backoff delay in ms (default: 1000, max: 1 day) ttl?: number, // Time-to-live in ms (max: 1 year) timeout?: number, // Processing timeout in ms (max: 1 day) uniqueKey?: string, // Deduplication key jobId?: string, // Custom job ID (idempotent) dependsOn?: string[], // Job IDs this job depends on tags?: string[], // Metadata tags groupId?: string, // Job group identifier lifo?: boolean, // Last-in-first-out (default: false) removeOnComplete?: boolean, // Auto-remove on completion (default: false) removeOnFail?: boolean, // Auto-remove on failure (default: false) durable?: boolean, // Force immediate disk write, bypassing write buffer (default: false) repeat?: { // Repeat configuration every?: number, // Repeat interval in ms limit?: number, // Max repetitions count?: number // Current count }}Response:
{ ok: true, id: string } // The generated job ID (UUIDv7)Batch push multiple jobs to a queue.
Request:
{ cmd: 'PUSHB', queue: string, jobs: Array<{ data: any, priority?: number, delay?: number, maxAttempts?: number, backoff?: number, ttl?: number, timeout?: number, uniqueKey?: string, customId?: string, tags?: string[], groupId?: string, lifo?: boolean, removeOnComplete?: boolean, removeOnFail?: boolean, durable?: boolean }>}Response:
{ ok: true, ids: string[] } // Array of generated job IDsPull the next available job from a queue. Supports optional long polling and lock-based ownership.
Request:
{ cmd: 'PULL', queue: string, timeout?: number, // Long poll timeout in ms (0-60000, default: 0) owner?: string, // Client identifier for lock-based pull lockTtl?: number // Lock TTL in ms (default: 30000)}Response (without owner):
{ ok: true, job: Job | null }Response (with owner — includes lock token):
{ ok: true, job: Job | null, token: string | null }The token must be passed to ACK or FAIL to verify ownership.
Batch pull multiple jobs from a queue.
Request:
{ cmd: 'PULLB', queue: string, count: number, // Number of jobs to pull (1-1000) timeout?: number, // Long poll timeout in ms (0-60000) owner?: string, // Client identifier for lock-based pull lockTtl?: number // Lock TTL in ms (default: 30000)}Response (without owner):
{ ok: true, jobs: Job[] }Response (with owner — includes lock tokens):
{ ok: true, jobs: Job[], tokens: string[] }Acknowledge a job as completed.
Request:
{ cmd: 'ACK', id: string, // Job ID result?: any, // Optional result data token?: string // Lock token (required if pulled with owner)}Response:
{ ok: true }Batch acknowledge multiple jobs.
Request:
{ cmd: 'ACKB', ids: string[], // Job IDs results?: any[], // Optional results (same order as ids; if provided, length must match ids) tokens?: string[] // Lock tokens (same order as ids)}Response:
{ ok: true }Mark a job as failed. The job will be retried with exponential backoff if it has remaining attempts, otherwise it is moved to the dead-letter queue.
Request:
{ cmd: 'FAIL', id: string, // Job ID error?: string, // Error message token?: string // Lock token (required if pulled with owner)}Response:
{ ok: true }Query Commands
Section titled “Query Commands”GetJob
Section titled “GetJob”Retrieve a job by its internal ID.
Request:
{ cmd: 'GetJob', id: string }Response:
{ ok: true, job: Job }Returns an error if the job is not found.
GetState
Section titled “GetState”Get the current state of a job.
Request:
{ cmd: 'GetState', id: string }Response:
{ ok: true, id: string, state: 'waiting' | 'delayed' | 'active' | 'completed' | 'failed' }GetResult
Section titled “GetResult”Get the stored result of a completed job.
Request:
{ cmd: 'GetResult', id: string }Response:
{ ok: true, id: string, result: any }The result field is the value passed via ACK. It may be null or undefined if no result was stored or if the result has been evicted from the LRU cache.
GetJobs
Section titled “GetJobs”List jobs with filtering and pagination.
Request:
{ cmd: 'GetJobs', queue: string, state?: 'waiting' | 'delayed' | 'active' | 'completed' | 'failed', limit?: number, // Max results (default: 100) offset?: number // Skip N results (default: 0)}Response:
{ ok: true, jobs: Job[] }GetJobCounts
Section titled “GetJobCounts”Get job counts grouped by state for a specific queue.
Request:
{ cmd: 'GetJobCounts', queue: string }Response:
{ ok: true, counts: { waiting: number, delayed: number, active: number, completed: number, failed: number }}GetCountsPerPriority
Section titled “GetCountsPerPriority”Get job counts grouped by priority level for a specific queue.
Request:
{ cmd: 'GetCountsPerPriority', queue: string }Response:
{ ok: true, queue: string, counts: Record<number, number> }GetJobByCustomId
Section titled “GetJobByCustomId”Look up a job by its custom ID (the jobId field from PUSH).
Request:
{ cmd: 'GetJobByCustomId', customId: string }Response:
{ ok: true, job: Job }Returns an error if no job with that custom ID exists.
Get the total number of jobs in a queue (all states).
Request:
{ cmd: 'Count', queue: string }Response:
{ ok: true, count: number }GetProgress
Section titled “GetProgress”Get the progress of an active job.
Request:
{ cmd: 'GetProgress', id: string }Response:
{ ok: true, progress: number, message: string | null }GetChildrenValues
Section titled “GetChildrenValues”Get the return values from all child jobs of a parent job. Used with FlowProducer workflows to retrieve results from completed children.
Request:
{ cmd: 'GetChildrenValues', id: string }Response:
{ ok: true, data: { values: Record<string, any> } }Returns an empty values object if the job has no children or if an error occurs.
Control Commands
Section titled “Control Commands”Cancel
Section titled “Cancel”Cancel a waiting or delayed job.
Request:
{ cmd: 'Cancel', id: string }Response:
{ ok: true }Progress
Section titled “Progress”Update the progress of an active job.
Request:
{ cmd: 'Progress', id: string, progress: number, // 0-100 message?: string // Optional progress message}Response:
{ ok: true }Update
Section titled “Update”Update the data payload of an existing job.
Request:
{ cmd: 'Update', id: string, data: any // New job data}Response:
{ ok: true }ChangePriority
Section titled “ChangePriority”Change the priority of a queued job.
Request:
{ cmd: 'ChangePriority', id: string, priority: number}Response:
{ ok: true }Promote
Section titled “Promote”Move a delayed job to the waiting state immediately.
Request:
{ cmd: 'Promote', id: string }Response:
{ ok: true }MoveToDelayed
Section titled “MoveToDelayed”Move an active job back to the delayed state.
Request:
{ cmd: 'MoveToDelayed', id: string, delay: number // Delay in ms from now}Response:
{ ok: true }Discard
Section titled “Discard”Discard a job by moving it to the dead-letter queue.
Request:
{ cmd: 'Discard', id: string }Response:
{ ok: true }WaitJob
Section titled “WaitJob”Wait for a job to complete. This is event-driven (no polling). Returns immediately if the job is already completed.
Request:
{ cmd: 'WaitJob', id: string, timeout?: number // Max wait time in ms (default: 30000)}Response:
{ ok: true, completed: boolean, result?: any }Pause a queue. Workers will stop pulling new jobs.
Request:
{ cmd: 'Pause', queue: string }Response:
{ ok: true }Resume
Section titled “Resume”Resume a paused queue.
Request:
{ cmd: 'Resume', queue: string }Response:
{ ok: true }IsPaused
Section titled “IsPaused”Check whether a queue is currently paused.
Request:
{ cmd: 'IsPaused', queue: string }Response:
{ ok: true, paused: boolean }Remove all waiting jobs from a queue.
Request:
{ cmd: 'Drain', queue: string }Response:
{ ok: true, count: number }Obliterate
Section titled “Obliterate”Remove all data for a queue (all jobs in all states).
Request:
{ cmd: 'Obliterate', queue: string }Response:
{ ok: true }Remove jobs older than a grace period, optionally filtered by state.
Request:
{ cmd: 'Clean', queue: string, grace: number, // Grace period in ms - jobs older than this are removed state?: string, // Filter by state (optional) limit?: number // Max jobs to remove (optional)}Response:
{ ok: true, count: number }ListQueues
Section titled “ListQueues”List all known queues with their status.
Request:
{ cmd: 'ListQueues' }Response:
{ ok: true, queues: Array<{ name: string, waiting: number, delayed: number, active: number, paused: boolean }>}DLQ Commands
Section titled “DLQ Commands”Retrieve jobs from the dead-letter queue.
Request:
{ cmd: 'Dlq', queue: string, count?: number // Max entries to return (optional)}Response:
{ ok: true, jobs: Job[] }RetryDlq
Section titled “RetryDlq”Retry jobs from the dead-letter queue (move them back to waiting).
Request:
{ cmd: 'RetryDlq', queue: string, jobId?: string // Retry a specific job (optional; omit to retry all)}Response:
{ ok: true, count: number } // Number of jobs retriedPurgeDlq
Section titled “PurgeDlq”Clear all jobs from the dead-letter queue.
Request:
{ cmd: 'PurgeDlq', queue: string }Response:
{ ok: true, count: number } // Number of jobs purgedRetryCompleted
Section titled “RetryCompleted”Re-queue completed jobs back to waiting state.
Request:
{ cmd: 'RetryCompleted', queue: string, id?: string // Retry a specific job (optional; omit to retry all)}Response:
{ ok: true, count: number }Cron Commands
Section titled “Cron Commands”Create or update a cron/repeating job schedule.
Request:
{ cmd: 'Cron', name: string, // Unique cron job name queue: string, // Target queue data: any, // Job data payload schedule?: string, // Cron expression (e.g., '*/5 * * * *') repeatEvery?: number, // Repeat interval in ms (alternative to schedule) priority?: number, // Job priority maxLimit?: number, // Max executions timezone?: string // IANA timezone (e.g., 'Europe/Rome', 'America/New_York')}Response:
{ ok: true, cron: { name: string, queue: string, schedule: string | null, repeatEvery: number | null, nextRun: number, timezone: string | undefined }}CronDelete
Section titled “CronDelete”Delete a cron job schedule by name.
Request:
{ cmd: 'CronDelete', name: string }Response:
{ ok: true }CronList
Section titled “CronList”List all registered cron job schedules.
Request:
{ cmd: 'CronList' }Response:
{ ok: true, crons: Array<{ name: string, queue: string, schedule: string | null, repeatEvery: number | null, nextRun: number, executions: number, maxLimit: number | undefined, timezone: string | undefined }>}CronGet
Section titled “CronGet”Get a single cron job by name.
Request:
{ cmd: 'CronGet', name: string }Response:
{ ok: true, cron: { name: string, queue: string, schedule: string | null, repeatEvery: number | null, nextRun: number, executions: number, maxLimit: number | undefined, timezone: string | undefined }}Returns an error if the cron job is not found.
Monitoring Commands
Section titled “Monitoring Commands”Connection health check.
Request:
{ cmd: 'Ping' }Response:
{ ok: true, data: { pong: true, time: number } }Protocol version negotiation and server capability discovery. See the Protocol Negotiation section above for details.
Request:
{ cmd: 'Hello', protocolVersion: number, capabilities?: ['pipelining']}Response:
{ ok: true, protocolVersion: number, capabilities: ['pipelining'], server: 'bunqueue', version: string}Get high-level server statistics.
Request:
{ cmd: 'Stats' }Response:
{ ok: true, stats: { queued: number, // Waiting jobs processing: number, // Active jobs delayed: number, // Delayed jobs dlq: number, // Dead-letter queue size completed: number, // Completed count uptime: number, // Server uptime in ms pushPerSec: number, // Push throughput pullPerSec: number // Pull throughput }}Metrics
Section titled “Metrics”Get detailed server metrics.
Request:
{ cmd: 'Metrics' }Response:
{ ok: true, metrics: { totalPushed: number, totalPulled: number, totalCompleted: number, totalFailed: number, avgLatencyMs: number, avgProcessingMs: number, memoryUsageMb: number, sqliteSizeMb: number, activeConnections: number }}Prometheus
Section titled “Prometheus”Get metrics in Prometheus text exposition format.
Request:
{ cmd: 'Prometheus' }Response:
{ ok: true, data: { metrics: string } }StorageStatus
Section titled “StorageStatus”Get the storage/disk health status. Reports whether the disk is full or has errors.
Request:
{ cmd: 'StorageStatus' }Response:
{ ok: true, data: { diskFull: boolean, // Whether the disk is full error: string | null, // Error message if any since: number | null // Timestamp when the issue started (ms since epoch) }}Heartbeat
Section titled “Heartbeat”Send a heartbeat for a registered worker (keeps the worker registration alive).
Request:
{ cmd: 'Heartbeat', id: string } // Worker IDResponse:
{ ok: true, data: { ok: true } }JobHeartbeat
Section titled “JobHeartbeat”Send a heartbeat for an active job (prevents stall detection from marking it as stalled). Also renews the lock if a token is provided.
Request:
{ cmd: 'JobHeartbeat', id: string, // Job ID token?: string // Lock token for renewal}Response:
{ ok: true, data: { ok: true } }JobHeartbeatB
Section titled “JobHeartbeatB”Batch job heartbeat for multiple active jobs.
Request:
{ cmd: 'JobHeartbeatB', ids: string[], // Job IDs tokens?: string[] // Lock tokens (same order as ids)}Response:
{ ok: true, data: { ok: true, count: number } }Worker Commands
Section titled “Worker Commands”RegisterWorker
Section titled “RegisterWorker”Register a worker with the server for monitoring.
Request:
{ cmd: 'RegisterWorker', name: string, queues: string[] // Queues this worker processes}Response:
{ ok: true, data: { workerId: string, name: string, queues: string[], registeredAt: number }}UnregisterWorker
Section titled “UnregisterWorker”Remove a worker registration.
Request:
{ cmd: 'UnregisterWorker', workerId: string }Response:
{ ok: true, data: { removed: true } }ListWorkers
Section titled “ListWorkers”List all registered workers and their stats.
Request:
{ cmd: 'ListWorkers' }Response:
{ ok: true, data: { workers: Array<{ id: string, name: string, queues: string[], registeredAt: number, lastSeen: number, activeJobs: number, processedJobs: number, failedJobs: number }>, stats: object // Aggregated worker stats }}Webhook Commands
Section titled “Webhook Commands”AddWebhook
Section titled “AddWebhook”Register a webhook to receive event notifications. URLs are validated to prevent SSRF (localhost, private IPs, and cloud metadata endpoints are blocked).
Request:
{ cmd: 'AddWebhook', url: string, // Webhook URL (https required for production) events: string[], // Event types to subscribe to queue?: string, // Filter by queue (optional) secret?: string // Signing secret for payload verification}Response:
{ ok: true, data: { webhookId: string, url: string, events: string[], queue: string | undefined, createdAt: number }}RemoveWebhook
Section titled “RemoveWebhook”Remove a registered webhook.
Request:
{ cmd: 'RemoveWebhook', webhookId: string }Response:
{ ok: true, data: { removed: true } }ListWebhooks
Section titled “ListWebhooks”List all registered webhooks.
Request:
{ cmd: 'ListWebhooks' }Response:
{ ok: true, data: { webhooks: Array<{ id: string, url: string, events: string[], queue: string | undefined, createdAt: number, lastTriggered: number | null, successCount: number, failureCount: number, enabled: boolean }>, stats: object }}Rate Limiting Commands
Section titled “Rate Limiting Commands”RateLimit
Section titled “RateLimit”Set a rate limit on a queue (max jobs processed per second).
Request:
{ cmd: 'RateLimit', queue: string, limit: number // Jobs per second}Response:
{ ok: true }RateLimitClear
Section titled “RateLimitClear”Remove the rate limit from a queue.
Request:
{ cmd: 'RateLimitClear', queue: string }Response:
{ ok: true }SetConcurrency
Section titled “SetConcurrency”Set a concurrency limit on a queue (max concurrent active jobs).
Request:
{ cmd: 'SetConcurrency', queue: string, limit: number}Response:
{ ok: true }ClearConcurrency
Section titled “ClearConcurrency”Remove the concurrency limit from a queue.
Request:
{ cmd: 'ClearConcurrency', queue: string }Response:
{ ok: true }Log Commands
Section titled “Log Commands”AddLog
Section titled “AddLog”Add a log entry to a job.
Request:
{ cmd: 'AddLog', id: string, // Job ID message: string, // Log message level?: 'info' | 'warn' | 'error' // Log level (default: 'info')}Response:
{ ok: true, data: { added: true } }GetLogs
Section titled “GetLogs”Get all log entries for a job.
Request:
{ cmd: 'GetLogs', id: string }Response:
{ ok: true, data: { logs: Array<{ message: string, level: string, timestamp: number }> } }Queue Name Validation
Section titled “Queue Name Validation”Queue names must satisfy the following constraints:
- Not empty and at most 256 characters
- Only alphanumeric characters, underscores, dashes, dots, and colons:
[a-zA-Z0-9_\-.:]+
Job Data Limits
Section titled “Job Data Limits”Job data payloads are limited to 10 MB when serialized.
Command Summary
Section titled “Command Summary”| Category | Command | Description |
|---|---|---|
| Core | PUSH | Add a job to a queue |
PUSHB | Batch push multiple jobs | |
PULL | Pull next job (supports long poll and locks) | |
PULLB | Batch pull jobs | |
ACK | Acknowledge job completion | |
ACKB | Batch acknowledge | |
FAIL | Mark job as failed | |
| Query | GetJob | Get job by ID |
GetState | Get job state | |
GetResult | Get job result | |
GetJobs | List jobs with filtering | |
GetJobCounts | Count jobs by state | |
GetCountsPerPriority | Count jobs by priority | |
GetJobByCustomId | Look up job by custom ID | |
Count | Total job count for a queue | |
GetProgress | Get job progress | |
GetChildrenValues | Get child job return values | |
| Control | Cancel | Cancel a job |
Progress | Update job progress | |
Update | Update job data | |
ChangePriority | Change job priority | |
Promote | Move delayed job to waiting | |
MoveToDelayed | Move active job to delayed | |
Discard | Move job to DLQ | |
WaitJob | Wait for job completion | |
Pause | Pause a queue | |
Resume | Resume a queue | |
IsPaused | Check if queue is paused | |
Drain | Remove all waiting jobs | |
Obliterate | Remove all queue data | |
Clean | Remove old jobs | |
ListQueues | List all queues | |
| DLQ | Dlq | Get DLQ entries |
RetryDlq | Retry DLQ jobs | |
PurgeDlq | Clear DLQ | |
RetryCompleted | Re-queue completed jobs | |
| Cron | Cron | Create/update cron schedule |
CronDelete | Delete cron schedule | |
CronList | List cron schedules | |
CronGet | Get cron schedule by name | |
| Monitoring | Ping | Health check |
Hello | Protocol negotiation | |
Stats | Server statistics | |
Metrics | Detailed metrics | |
Prometheus | Prometheus-format metrics | |
StorageStatus | Get storage/disk health status | |
Heartbeat | Worker heartbeat | |
JobHeartbeat | Job heartbeat (stall prevention) | |
JobHeartbeatB | Batch job heartbeat | |
| Workers | RegisterWorker | Register a worker |
UnregisterWorker | Unregister a worker | |
ListWorkers | List workers | |
| Webhooks | AddWebhook | Register a webhook |
RemoveWebhook | Remove a webhook | |
ListWebhooks | List webhooks | |
| Rate | RateLimit | Set queue rate limit |
RateLimitClear | Clear queue rate limit | |
SetConcurrency | Set queue concurrency limit | |
ClearConcurrency | Clear concurrency limit | |
| Logs | AddLog | Add job log entry |
GetLogs | Get job logs | |
| Auth | Auth | Authenticate connection |