Skip to content

bunqueue TCP Protocol Reference: Binary MessagePack Commands

High-performance binary protocol on port 6789 (default). All messages use MessagePack encoding with length-prefixed framing. Supports pipelining for concurrent command processing.

Every message (request and response) is wrapped in a length-prefixed frame:

┌──────────────────────┬──────────────────────────────┐
│ 4 bytes (Big-Endian │ N bytes (MessagePack payload)│
│ unsigned 32-bit │ │
│ payload length) │ │
└──────────────────────┴──────────────────────────────┘

The framing protocol works as follows:

  1. The first 4 bytes are a big-endian unsigned 32-bit integer indicating the length of the MessagePack payload.
  2. The next N bytes are the MessagePack-encoded command or response object.
  3. Maximum frame size is 64 MB. Frames exceeding this limit cause the connection to be terminated.
import { pack, unpack } from 'msgpackr';
// Encode a command into a framed message
function frameCommand(cmd: object): Uint8Array {
const payload = pack(cmd);
const frame = new Uint8Array(4 + payload.length);
// Write length prefix (big-endian u32)
frame[0] = (payload.length >> 24) & 0xff;
frame[1] = (payload.length >> 16) & 0xff;
frame[2] = (payload.length >> 8) & 0xff;
frame[3] = payload.length & 0xff;
frame.set(payload, 4);
return frame;
}
// Decode a framed response
function decodeFrame(frame: Uint8Array): object {
return unpack(frame);
}
import { pack, unpack } from 'msgpackr';
const socket = await Bun.connect({
hostname: 'localhost',
port: 6789,
socket: {
data(socket, data) {
// Parse frames from data, then unpack each frame with msgpackr
},
},
});
// Send a command
const cmd = pack({ cmd: 'Ping' });
const frame = new Uint8Array(4 + cmd.length);
frame[0] = (cmd.length >> 24) & 0xff;
frame[1] = (cmd.length >> 16) & 0xff;
frame[2] = (cmd.length >> 8) & 0xff;
frame[3] = cmd.length & 0xff;
frame.set(cmd, 4);
socket.write(frame);

Clients should send a Hello command after connecting to negotiate protocol version and discover server capabilities.

Request:

{ cmd: 'Hello', protocolVersion: 2, capabilities: ['pipelining'] }

Response:

{
ok: true,
protocolVersion: 2,
capabilities: ['pipelining'],
server: 'bunqueue',
version: '2.1.8' // Server version string
}

The current protocol version is 2. The only supported capability is pipelining.

The server supports pipelining: clients can send multiple commands without waiting for each response. The server processes frames in parallel with a concurrency limit of 50 commands per connection, controlled by a semaphore.

To correlate responses with requests when pipelining, include a reqId field in each command. The server echoes reqId back in the corresponding response.

// Send two commands simultaneously
socket.write(frameCommand({ cmd: 'PUSH', queue: 'emails', data: { to: 'a@b.com' }, reqId: '1' }));
socket.write(frameCommand({ cmd: 'PUSH', queue: 'emails', data: { to: 'c@d.com' }, reqId: '2' }));
// Responses may arrive in any order - match by reqId
// { ok: true, id: 'abc-123', reqId: '1' }
// { ok: true, id: 'def-456', reqId: '2' }

When the server is configured with AUTH_TOKENS, all connections must authenticate before sending other commands. The Auth command is always permitted regardless of authentication state.

Request:

{ cmd: 'Auth', token: 'your-secret-token' }

Response (success):

{ ok: true }

Response (failure):

{ ok: false, error: 'Invalid token' }

If auth tokens are configured and a client sends any command before authenticating, the server responds with:

{ ok: false, error: 'Not authenticated' }

All responses include an ok boolean field. On success ok is true with command-specific data. On failure ok is false with an error string.

// Success
{ ok: true, ...data, reqId?: string }
// Error
{ ok: false, error: 'Error message', reqId?: string }

When a TCP connection closes, the server automatically releases all jobs that were being processed by that client back to their queues. This uses retry logic with exponential backoff (up to 3 attempts) to ensure jobs are not left in an inconsistent state.

Each connection is subject to server-side rate limiting. If exceeded, the server responds with:

{ ok: false, error: 'Rate limit exceeded' }

Every command object must include a cmd field. An optional reqId field can be included for request-response correlation (required for pipelining).

Add a single job to a queue.

Request:

{
cmd: 'PUSH',
queue: string, // Queue name (required, max 256 chars, alphanumeric/underscore/dash/dot/colon)
data: any, // Job payload (required, max 10 MB)
priority?: number, // Higher = processed sooner (default: 0, range: -1000000 to 1000000)
delay?: number, // Delay in ms before processing (default: 0, max: 1 year)
maxAttempts?: number, // Max retry attempts (default: 3, range: 1-1000)
backoff?: number, // Retry backoff delay in ms (default: 1000, max: 1 day)
ttl?: number, // Time-to-live in ms (max: 1 year)
timeout?: number, // Processing timeout in ms (max: 1 day)
uniqueKey?: string, // Deduplication key
jobId?: string, // Custom job ID (idempotent)
dependsOn?: string[], // Job IDs this job depends on
tags?: string[], // Metadata tags
groupId?: string, // Job group identifier
lifo?: boolean, // Last-in-first-out (default: false)
removeOnComplete?: boolean, // Auto-remove on completion (default: false)
removeOnFail?: boolean, // Auto-remove on failure (default: false)
durable?: boolean, // Force immediate disk write, bypassing write buffer (default: false)
repeat?: { // Repeat configuration
every?: number, // Repeat interval in ms
limit?: number, // Max repetitions
count?: number // Current count
}
}

Response:

{ ok: true, id: string } // The generated job ID (UUIDv7)

Batch push multiple jobs to a queue.

Request:

{
cmd: 'PUSHB',
queue: string,
jobs: Array<{
data: any,
priority?: number,
delay?: number,
maxAttempts?: number,
backoff?: number,
ttl?: number,
timeout?: number,
uniqueKey?: string,
customId?: string,
tags?: string[],
groupId?: string,
lifo?: boolean,
removeOnComplete?: boolean,
removeOnFail?: boolean,
durable?: boolean
}>
}

Response:

{ ok: true, ids: string[] } // Array of generated job IDs

Pull the next available job from a queue. Supports optional long polling and lock-based ownership.

Request:

{
cmd: 'PULL',
queue: string,
timeout?: number, // Long poll timeout in ms (0-60000, default: 0)
owner?: string, // Client identifier for lock-based pull
lockTtl?: number // Lock TTL in ms (default: 30000)
}

Response (without owner):

{ ok: true, job: Job | null }

Response (with owner — includes lock token):

{ ok: true, job: Job | null, token: string | null }

The token must be passed to ACK or FAIL to verify ownership.


Batch pull multiple jobs from a queue.

Request:

{
cmd: 'PULLB',
queue: string,
count: number, // Number of jobs to pull (1-1000)
timeout?: number, // Long poll timeout in ms (0-60000)
owner?: string, // Client identifier for lock-based pull
lockTtl?: number // Lock TTL in ms (default: 30000)
}

Response (without owner):

{ ok: true, jobs: Job[] }

Response (with owner — includes lock tokens):

{ ok: true, jobs: Job[], tokens: string[] }

Acknowledge a job as completed.

Request:

{
cmd: 'ACK',
id: string, // Job ID
result?: any, // Optional result data
token?: string // Lock token (required if pulled with owner)
}

Response:

{ ok: true }

Batch acknowledge multiple jobs.

Request:

{
cmd: 'ACKB',
ids: string[], // Job IDs
results?: any[], // Optional results (same order as ids; if provided, length must match ids)
tokens?: string[] // Lock tokens (same order as ids)
}

Response:

{ ok: true }

Mark a job as failed. The job will be retried with exponential backoff if it has remaining attempts, otherwise it is moved to the dead-letter queue.

Request:

{
cmd: 'FAIL',
id: string, // Job ID
error?: string, // Error message
token?: string // Lock token (required if pulled with owner)
}

Response:

{ ok: true }

Retrieve a job by its internal ID.

Request:

{ cmd: 'GetJob', id: string }

Response:

{ ok: true, job: Job }

Returns an error if the job is not found.


Get the current state of a job.

Request:

{ cmd: 'GetState', id: string }

Response:

{ ok: true, id: string, state: 'waiting' | 'delayed' | 'active' | 'completed' | 'failed' }

Get the stored result of a completed job.

Request:

{ cmd: 'GetResult', id: string }

Response:

{ ok: true, id: string, result: any }

The result field is the value passed via ACK. It may be null or undefined if no result was stored or if the result has been evicted from the LRU cache.


List jobs with filtering and pagination.

Request:

{
cmd: 'GetJobs',
queue: string,
state?: 'waiting' | 'delayed' | 'active' | 'completed' | 'failed',
limit?: number, // Max results (default: 100)
offset?: number // Skip N results (default: 0)
}

Response:

{ ok: true, jobs: Job[] }

Get job counts grouped by state for a specific queue.

Request:

{ cmd: 'GetJobCounts', queue: string }

Response:

{
ok: true,
counts: {
waiting: number,
delayed: number,
active: number,
completed: number,
failed: number
}
}

Get job counts grouped by priority level for a specific queue.

Request:

{ cmd: 'GetCountsPerPriority', queue: string }

Response:

{ ok: true, queue: string, counts: Record<number, number> }

Look up a job by its custom ID (the jobId field from PUSH).

Request:

{ cmd: 'GetJobByCustomId', customId: string }

Response:

{ ok: true, job: Job }

Returns an error if no job with that custom ID exists.


Get the total number of jobs in a queue (all states).

Request:

{ cmd: 'Count', queue: string }

Response:

{ ok: true, count: number }

Get the progress of an active job.

Request:

{ cmd: 'GetProgress', id: string }

Response:

{ ok: true, progress: number, message: string | null }

Get the return values from all child jobs of a parent job. Used with FlowProducer workflows to retrieve results from completed children.

Request:

{ cmd: 'GetChildrenValues', id: string }

Response:

{ ok: true, data: { values: Record<string, any> } }

Returns an empty values object if the job has no children or if an error occurs.


Cancel a waiting or delayed job.

Request:

{ cmd: 'Cancel', id: string }

Response:

{ ok: true }

Update the progress of an active job.

Request:

{
cmd: 'Progress',
id: string,
progress: number, // 0-100
message?: string // Optional progress message
}

Response:

{ ok: true }

Update the data payload of an existing job.

Request:

{
cmd: 'Update',
id: string,
data: any // New job data
}

Response:

{ ok: true }

Change the priority of a queued job.

Request:

{
cmd: 'ChangePriority',
id: string,
priority: number
}

Response:

{ ok: true }

Move a delayed job to the waiting state immediately.

Request:

{ cmd: 'Promote', id: string }

Response:

{ ok: true }

Move an active job back to the delayed state.

Request:

{
cmd: 'MoveToDelayed',
id: string,
delay: number // Delay in ms from now
}

Response:

{ ok: true }

Discard a job by moving it to the dead-letter queue.

Request:

{ cmd: 'Discard', id: string }

Response:

{ ok: true }

Wait for a job to complete. This is event-driven (no polling). Returns immediately if the job is already completed.

Request:

{
cmd: 'WaitJob',
id: string,
timeout?: number // Max wait time in ms (default: 30000)
}

Response:

{ ok: true, completed: boolean, result?: any }

Pause a queue. Workers will stop pulling new jobs.

Request:

{ cmd: 'Pause', queue: string }

Response:

{ ok: true }

Resume a paused queue.

Request:

{ cmd: 'Resume', queue: string }

Response:

{ ok: true }

Check whether a queue is currently paused.

Request:

{ cmd: 'IsPaused', queue: string }

Response:

{ ok: true, paused: boolean }

Remove all waiting jobs from a queue.

Request:

{ cmd: 'Drain', queue: string }

Response:

{ ok: true, count: number }

Remove all data for a queue (all jobs in all states).

Request:

{ cmd: 'Obliterate', queue: string }

Response:

{ ok: true }

Remove jobs older than a grace period, optionally filtered by state.

Request:

{
cmd: 'Clean',
queue: string,
grace: number, // Grace period in ms - jobs older than this are removed
state?: string, // Filter by state (optional)
limit?: number // Max jobs to remove (optional)
}

Response:

{ ok: true, count: number }

List all known queues with their status.

Request:

{ cmd: 'ListQueues' }

Response:

{
ok: true,
queues: Array<{
name: string,
waiting: number,
delayed: number,
active: number,
paused: boolean
}>
}

Retrieve jobs from the dead-letter queue.

Request:

{
cmd: 'Dlq',
queue: string,
count?: number // Max entries to return (optional)
}

Response:

{ ok: true, jobs: Job[] }

Retry jobs from the dead-letter queue (move them back to waiting).

Request:

{
cmd: 'RetryDlq',
queue: string,
jobId?: string // Retry a specific job (optional; omit to retry all)
}

Response:

{ ok: true, count: number } // Number of jobs retried

Clear all jobs from the dead-letter queue.

Request:

{ cmd: 'PurgeDlq', queue: string }

Response:

{ ok: true, count: number } // Number of jobs purged

Re-queue completed jobs back to waiting state.

Request:

{
cmd: 'RetryCompleted',
queue: string,
id?: string // Retry a specific job (optional; omit to retry all)
}

Response:

{ ok: true, count: number }

Create or update a cron/repeating job schedule.

Request:

{
cmd: 'Cron',
name: string, // Unique cron job name
queue: string, // Target queue
data: any, // Job data payload
schedule?: string, // Cron expression (e.g., '*/5 * * * *')
repeatEvery?: number, // Repeat interval in ms (alternative to schedule)
priority?: number, // Job priority
maxLimit?: number, // Max executions
timezone?: string // IANA timezone (e.g., 'Europe/Rome', 'America/New_York')
}

Response:

{
ok: true,
cron: {
name: string,
queue: string,
schedule: string | null,
repeatEvery: number | null,
nextRun: number,
timezone: string | undefined
}
}

Delete a cron job schedule by name.

Request:

{ cmd: 'CronDelete', name: string }

Response:

{ ok: true }

List all registered cron job schedules.

Request:

{ cmd: 'CronList' }

Response:

{
ok: true,
crons: Array<{
name: string,
queue: string,
schedule: string | null,
repeatEvery: number | null,
nextRun: number,
executions: number,
maxLimit: number | undefined,
timezone: string | undefined
}>
}

Get a single cron job by name.

Request:

{ cmd: 'CronGet', name: string }

Response:

{
ok: true,
cron: {
name: string,
queue: string,
schedule: string | null,
repeatEvery: number | null,
nextRun: number,
executions: number,
maxLimit: number | undefined,
timezone: string | undefined
}
}

Returns an error if the cron job is not found.


Connection health check.

Request:

{ cmd: 'Ping' }

Response:

{ ok: true, data: { pong: true, time: number } }

Protocol version negotiation and server capability discovery. See the Protocol Negotiation section above for details.

Request:

{
cmd: 'Hello',
protocolVersion: number,
capabilities?: ['pipelining']
}

Response:

{
ok: true,
protocolVersion: number,
capabilities: ['pipelining'],
server: 'bunqueue',
version: string
}

Get high-level server statistics.

Request:

{ cmd: 'Stats' }

Response:

{
ok: true,
stats: {
queued: number, // Waiting jobs
processing: number, // Active jobs
delayed: number, // Delayed jobs
dlq: number, // Dead-letter queue size
completed: number, // Completed count
uptime: number, // Server uptime in ms
pushPerSec: number, // Push throughput
pullPerSec: number // Pull throughput
}
}

Get detailed server metrics.

Request:

{ cmd: 'Metrics' }

Response:

{
ok: true,
metrics: {
totalPushed: number,
totalPulled: number,
totalCompleted: number,
totalFailed: number,
avgLatencyMs: number,
avgProcessingMs: number,
memoryUsageMb: number,
sqliteSizeMb: number,
activeConnections: number
}
}

Get metrics in Prometheus text exposition format.

Request:

{ cmd: 'Prometheus' }

Response:

{ ok: true, data: { metrics: string } }

Get the storage/disk health status. Reports whether the disk is full or has errors.

Request:

{ cmd: 'StorageStatus' }

Response:

{
ok: true,
data: {
diskFull: boolean, // Whether the disk is full
error: string | null, // Error message if any
since: number | null // Timestamp when the issue started (ms since epoch)
}
}

Send a heartbeat for a registered worker (keeps the worker registration alive).

Request:

{ cmd: 'Heartbeat', id: string } // Worker ID

Response:

{ ok: true, data: { ok: true } }

Send a heartbeat for an active job (prevents stall detection from marking it as stalled). Also renews the lock if a token is provided.

Request:

{
cmd: 'JobHeartbeat',
id: string, // Job ID
token?: string // Lock token for renewal
}

Response:

{ ok: true, data: { ok: true } }

Batch job heartbeat for multiple active jobs.

Request:

{
cmd: 'JobHeartbeatB',
ids: string[], // Job IDs
tokens?: string[] // Lock tokens (same order as ids)
}

Response:

{ ok: true, data: { ok: true, count: number } }

Register a worker with the server for monitoring.

Request:

{
cmd: 'RegisterWorker',
name: string,
queues: string[] // Queues this worker processes
}

Response:

{
ok: true,
data: {
workerId: string,
name: string,
queues: string[],
registeredAt: number
}
}

Remove a worker registration.

Request:

{ cmd: 'UnregisterWorker', workerId: string }

Response:

{ ok: true, data: { removed: true } }

List all registered workers and their stats.

Request:

{ cmd: 'ListWorkers' }

Response:

{
ok: true,
data: {
workers: Array<{
id: string,
name: string,
queues: string[],
registeredAt: number,
lastSeen: number,
activeJobs: number,
processedJobs: number,
failedJobs: number
}>,
stats: object // Aggregated worker stats
}
}

Register a webhook to receive event notifications. URLs are validated to prevent SSRF (localhost, private IPs, and cloud metadata endpoints are blocked).

Request:

{
cmd: 'AddWebhook',
url: string, // Webhook URL (https required for production)
events: string[], // Event types to subscribe to
queue?: string, // Filter by queue (optional)
secret?: string // Signing secret for payload verification
}

Response:

{
ok: true,
data: {
webhookId: string,
url: string,
events: string[],
queue: string | undefined,
createdAt: number
}
}

Remove a registered webhook.

Request:

{ cmd: 'RemoveWebhook', webhookId: string }

Response:

{ ok: true, data: { removed: true } }

List all registered webhooks.

Request:

{ cmd: 'ListWebhooks' }

Response:

{
ok: true,
data: {
webhooks: Array<{
id: string,
url: string,
events: string[],
queue: string | undefined,
createdAt: number,
lastTriggered: number | null,
successCount: number,
failureCount: number,
enabled: boolean
}>,
stats: object
}
}

Set a rate limit on a queue (max jobs processed per second).

Request:

{
cmd: 'RateLimit',
queue: string,
limit: number // Jobs per second
}

Response:

{ ok: true }

Remove the rate limit from a queue.

Request:

{ cmd: 'RateLimitClear', queue: string }

Response:

{ ok: true }

Set a concurrency limit on a queue (max concurrent active jobs).

Request:

{
cmd: 'SetConcurrency',
queue: string,
limit: number
}

Response:

{ ok: true }

Remove the concurrency limit from a queue.

Request:

{ cmd: 'ClearConcurrency', queue: string }

Response:

{ ok: true }

Add a log entry to a job.

Request:

{
cmd: 'AddLog',
id: string, // Job ID
message: string, // Log message
level?: 'info' | 'warn' | 'error' // Log level (default: 'info')
}

Response:

{ ok: true, data: { added: true } }

Get all log entries for a job.

Request:

{ cmd: 'GetLogs', id: string }

Response:

{ ok: true, data: { logs: Array<{ message: string, level: string, timestamp: number }> } }

Queue names must satisfy the following constraints:

  • Not empty and at most 256 characters
  • Only alphanumeric characters, underscores, dashes, dots, and colons: [a-zA-Z0-9_\-.:]+

Job data payloads are limited to 10 MB when serialized.

CategoryCommandDescription
CorePUSHAdd a job to a queue
PUSHBBatch push multiple jobs
PULLPull next job (supports long poll and locks)
PULLBBatch pull jobs
ACKAcknowledge job completion
ACKBBatch acknowledge
FAILMark job as failed
QueryGetJobGet job by ID
GetStateGet job state
GetResultGet job result
GetJobsList jobs with filtering
GetJobCountsCount jobs by state
GetCountsPerPriorityCount jobs by priority
GetJobByCustomIdLook up job by custom ID
CountTotal job count for a queue
GetProgressGet job progress
GetChildrenValuesGet child job return values
ControlCancelCancel a job
ProgressUpdate job progress
UpdateUpdate job data
ChangePriorityChange job priority
PromoteMove delayed job to waiting
MoveToDelayedMove active job to delayed
DiscardMove job to DLQ
WaitJobWait for job completion
PausePause a queue
ResumeResume a queue
IsPausedCheck if queue is paused
DrainRemove all waiting jobs
ObliterateRemove all queue data
CleanRemove old jobs
ListQueuesList all queues
DLQDlqGet DLQ entries
RetryDlqRetry DLQ jobs
PurgeDlqClear DLQ
RetryCompletedRe-queue completed jobs
CronCronCreate/update cron schedule
CronDeleteDelete cron schedule
CronListList cron schedules
CronGetGet cron schedule by name
MonitoringPingHealth check
HelloProtocol negotiation
StatsServer statistics
MetricsDetailed metrics
PrometheusPrometheus-format metrics
StorageStatusGet storage/disk health status
HeartbeatWorker heartbeat
JobHeartbeatJob heartbeat (stall prevention)
JobHeartbeatBBatch job heartbeat
WorkersRegisterWorkerRegister a worker
UnregisterWorkerUnregister a worker
ListWorkersList workers
WebhooksAddWebhookRegister a webhook
RemoveWebhookRemove a webhook
ListWebhooksList webhooks
RateRateLimitSet queue rate limit
RateLimitClearClear queue rate limit
SetConcurrencySet queue concurrency limit
ClearConcurrencyClear concurrency limit
LogsAddLogAdd job log entry
GetLogsGet job logs
AuthAuthAuthenticate connection