Skip to content

bunqueue TCP Protocol Architecture: Wire Format & Pipelining

bunqueue uses a high-performance binary protocol over TCP with MessagePack serialization and optional pipelining.

Each message is a length-prefixed MessagePack frame:

BytesContent
0-3Frame length (4 bytes, big-endian uint32)
4-NMessagePack payload

Maximum frame size: 64 MB

Pipelining allows multiple commands to be sent without waiting for responses, dramatically improving throughput.

Client Server
│── PUSH job1 ────────────>│
│<── { ok, id } ───────────│ wait ~1ms
│── PUSH job2 ────────────>│
│<── { ok, id } ───────────│ wait ~1ms
│── PUSH job3 ────────────>│
│<── { ok, id } ───────────│ wait ~1ms
Total: 3 round-trips ≈ 3ms
Throughput: ~20,000 ops/sec
Client Server
│── PUSH job1 (reqId:1) ──>│
│── PUSH job2 (reqId:2) ──>│ no wait
│── PUSH job3 (reqId:3) ──>│ no wait
│<── { ok, reqId:1 } ──────│
│<── { ok, reqId:2 } ──────│
│<── { ok, reqId:3 } ──────│
Total: 1 round-trip ≈ 1ms
Throughput: ~125,000 ops/sec

Result: 6x faster with pipelining enabled.

  1. Client sends commands with unique reqId identifiers
  2. Server processes in parallel (up to 50 concurrent per connection)
  3. Responses include reqId for matching (may arrive out of order)
  4. Client matches responses using a Map<reqId, Promise>
const queue = new Queue('my-queue', {
connection: {
host: 'localhost',
port: 6789,
pipelining: true, // Enable pipelining (default: true)
maxInFlight: 100, // Max concurrent commands (default: 100)
poolSize: 32, // Connection pool size
commandTimeout: 30000 // Timeout per command (ms)
}
});
OptionDefaultDescription
pipeliningtrueEnable TCP pipelining
maxInFlight100Max commands in flight per connection
poolSize4Number of TCP connections
commandTimeout30000Command timeout (ms)

On connect, client and server negotiate protocol version:

// Client → Server
{ cmd: 'Hello', protocolVersion: 2, capabilities: ['pipelining'] }
// Server → Client
{ ok: true, protocolVersion: 2, capabilities: ['pipelining'] }

Protocol v2 supports pipelining. Older clients without Hello default to v1 (sequential).

States:

  1. DISCONNECTED → Initial state
  2. CONNECTING → Socket.connect() in progress
  3. CONNECTED → Ready for commands
  4. RECONNECTING → Auto-reconnect with backoff

Connect sequence:

  1. TCP socket connect
  2. Send Hello (protocol negotiation)
  3. Send Auth (if token configured)
  4. Start ping timer
  5. Ready for commands

Reconnect strategy:

  • Base delay: 100ms
  • Max delay: 30s
  • Backoff: exponential (2x each attempt)
  • Jitter: ±30%

If AUTH_TOKENS is configured on the server, clients must authenticate:

// Client → Server
{ cmd: 'Auth', token: 'your-secret-token' }
// Server → Client
{ ok: true } // or { ok: false, error: 'Invalid token' }

Token comparison uses constant-time algorithm to prevent timing attacks.

CommandDescriptionRequestResponse
PUSHAdd single job{ cmd, queue, data, priority?, delay? }{ ok, id }
PUSHBAdd batch{ cmd, queue, jobs }{ ok, ids }
PULLGet single job{ cmd, queue, timeout? }{ ok, job, token? }
PULLBGet batch{ cmd, queue, count, timeout? }{ ok, jobs, tokens? }
ACKComplete job{ cmd, id, result?, token? }{ ok }
ACKBComplete batch{ cmd, ids, results?, tokens? }{ ok }
FAILFail job{ cmd, id, error?, token? }{ ok }
CommandDescription
GetJobGet job by ID
GetJobByCustomIdGet job by custom ID
GetStateGet job state
GetResultGet job result
GetJobsList jobs with filters
GetJobCountsQueue statistics
GetCountsPerPriorityCounts grouped by priority
GetProgressGet job progress
CountCount jobs in queue
CommandDescription
PauseStop processing queue
ResumeResume processing
IsPausedCheck if queue is paused
DrainRemove waiting jobs
ObliterateDelete queue completely
CleanRemove old jobs
CancelCancel pending job
PromoteMove delayed job to waiting
MoveToDelayedMove job to delayed state
ProgressUpdate job progress
ListQueuesList all queues
CommandDescription
DlqList DLQ entries
RetryDlqRetry failed jobs
RetryCompletedRetry completed jobs
PurgeDlqClear DLQ
CommandDescription
CronAdd scheduled job
CronDeleteRemove scheduled job
CronListList all cron jobs
CommandDescription
StatsServer statistics
MetricsQueue metrics
PrometheusPrometheus format
PingHealth check
HeartbeatWorker heartbeat
JobHeartbeatPer-job heartbeat
AddLogAdd job log entry
GetLogsGet job logs
RegisterWorkerRegister worker with server
UnregisterWorkerUnregister worker
ListWorkersList registered workers

The client maintains a pool of TCP connections for load balancing:

// Default: 4 connections, configurable via poolSize
const pool = new TcpConnectionPool({
host: 'localhost',
port: 6789,
poolSize: 32 // 32 connections for high throughput
});

Selection strategy: Round-robin, preferring connected sockets.

Features:

  • Automatic reconnection
  • Health tracking (latency, errors)
  • Shared pools (reference counted)

When a client disconnects, the server:

  1. Identifies all jobs owned by client
  2. Releases job locks (returns to queue)
  3. Cleans up client tracking

Jobs with active locks are automatically requeued for other workers.

ParameterLimit
Queue nameMax 256 chars, alphanumeric + _-.:
Job dataMax 10 MB JSON
Priority-1,000,000 to +1,000,000
Delay0 to 365 days
Timeout0 to 24 hours
Max attempts1 to 1,000
Backoff0 to 24 hours
TTL0 to 365 days

bunqueue also exposes an HTTP API on port 6790:

EndpointMethodDescription
/healthGETHealth + memory stats
/healthzGETKubernetes liveness
/readyGETKubernetes readiness
/prometheusGETPrometheus metrics
/statsGETJSON statistics
/queues/:queue/jobsPOSTAdd job
/queues/:queue/jobsGETPull job
/jobs/:idGETGet job
/jobs/:id/ackPOSTAcknowledge
/jobs/:id/failPOSTFail
/wsGETWebSocket
/eventsGETServer-Sent Events