Skip to content

TCP Protocol Architecture

bunqueue uses a high-performance binary protocol over TCP with MessagePack serialization and optional pipelining.

Wire Format

Each message is a length-prefixed MessagePack frame:

BytesContent
0-3Frame length (4 bytes, big-endian uint32)
4-NMessagePack payload

Maximum frame size: 64 MB

TCP Pipelining

Pipelining allows multiple commands to be sent without waiting for responses, dramatically improving throughput.

Without Pipelining (Sequential)

Client Server
│── PUSH job1 ────────────>│
│<── { ok, id } ───────────│ wait ~1ms
│── PUSH job2 ────────────>│
│<── { ok, id } ───────────│ wait ~1ms
│── PUSH job3 ────────────>│
│<── { ok, id } ───────────│ wait ~1ms
Total: 3 round-trips ≈ 3ms
Throughput: ~20,000 ops/sec

With Pipelining (Parallel)

Client Server
│── PUSH job1 (reqId:1) ──>│
│── PUSH job2 (reqId:2) ──>│ no wait
│── PUSH job3 (reqId:3) ──>│ no wait
│<── { ok, reqId:1 } ──────│
│<── { ok, reqId:2 } ──────│
│<── { ok, reqId:3 } ──────│
Total: 1 round-trip ≈ 1ms
Throughput: ~125,000 ops/sec

Result: 6x faster with pipelining enabled.

How Pipelining Works

  1. Client sends commands with unique reqId identifiers
  2. Server processes in parallel (up to 50 concurrent per connection)
  3. Responses include reqId for matching (may arrive out of order)
  4. Client matches responses using a Map<reqId, Promise>

Configuration

const queue = new Queue('my-queue', {
connection: {
host: 'localhost',
port: 6789,
pipelining: true, // Enable pipelining (default: true)
maxInFlight: 100, // Max concurrent commands (default: 100)
poolSize: 32, // Connection pool size
commandTimeout: 30000 // Timeout per command (ms)
}
});
OptionDefaultDescription
pipeliningtrueEnable TCP pipelining
maxInFlight100Max commands in flight per connection
poolSize4Number of TCP connections
commandTimeout30000Command timeout (ms)

Protocol Version Negotiation

On connect, client and server negotiate protocol version:

// Client → Server
{ cmd: 'Hello', protocolVersion: 2, capabilities: ['pipelining'] }
// Server → Client
{ ok: true, protocolVersion: 2, capabilities: ['pipelining'] }

Protocol v2 supports pipelining. Older clients without Hello default to v1 (sequential).

Connection Lifecycle

States:

  1. DISCONNECTED → Initial state
  2. CONNECTING → Socket.connect() in progress
  3. CONNECTED → Ready for commands
  4. RECONNECTING → Auto-reconnect with backoff

Connect sequence:

  1. TCP socket connect
  2. Send Hello (protocol negotiation)
  3. Send Auth (if token configured)
  4. Start ping timer
  5. Ready for commands

Reconnect strategy:

  • Base delay: 100ms
  • Max delay: 30s
  • Backoff: exponential (2x each attempt)
  • Jitter: ±30%

Authentication

If AUTH_TOKENS is configured on the server, clients must authenticate:

// Client → Server
{ cmd: 'Auth', token: 'your-secret-token' }
// Server → Client
{ ok: true } // or { ok: false, error: 'Invalid token' }

Token comparison uses constant-time algorithm to prevent timing attacks.

Commands Reference

Core Commands

CommandDescriptionRequestResponse
PUSHAdd single job{ cmd, queue, data, priority?, delay? }{ ok, id }
PUSHBAdd batch{ cmd, queue, jobs }{ ok, ids }
PULLGet single job{ cmd, queue, timeout? }{ ok, job, token? }
PULLBGet batch{ cmd, queue, count, timeout? }{ ok, jobs, tokens? }
ACKComplete job{ cmd, id, result?, token? }{ ok }
ACKBComplete batch{ cmd, ids, results?, tokens? }{ ok }
FAILFail job{ cmd, id, error?, token? }{ ok }

Query Commands

CommandDescription
GetJobGet job by ID
GetJobByCustomIdGet job by custom ID
GetStateGet job state
GetResultGet job result
GetJobsList jobs with filters
GetJobCountsQueue statistics
GetCountsPerPriorityCounts grouped by priority
GetProgressGet job progress
CountCount jobs in queue

Control Commands

CommandDescription
PauseStop processing queue
ResumeResume processing
IsPausedCheck if queue is paused
DrainRemove waiting jobs
ObliterateDelete queue completely
CleanRemove old jobs
CancelCancel pending job
PromoteMove delayed job to waiting
MoveToDelayedMove job to delayed state
ProgressUpdate job progress
ListQueuesList all queues

DLQ Commands

CommandDescription
DlqList DLQ entries
RetryDlqRetry failed jobs
RetryCompletedRetry completed jobs
PurgeDlqClear DLQ

Cron Commands

CommandDescription
CronAdd scheduled job
CronDeleteRemove scheduled job
CronListList all cron jobs

Monitoring Commands

CommandDescription
StatsServer statistics
MetricsQueue metrics
PrometheusPrometheus format
PingHealth check
HeartbeatWorker heartbeat
JobHeartbeatPer-job heartbeat
AddLogAdd job log entry
GetLogsGet job logs
RegisterWorkerRegister worker with server
UnregisterWorkerUnregister worker
ListWorkersList registered workers

Connection Pool

The client maintains a pool of TCP connections for load balancing:

// Default: 4 connections, configurable via poolSize
const pool = new TcpConnectionPool({
host: 'localhost',
port: 6789,
poolSize: 32 // 32 connections for high throughput
});

Selection strategy: Round-robin, preferring connected sockets.

Features:

  • Automatic reconnection
  • Health tracking (latency, errors)
  • Shared pools (reference counted)

Client Disconnect Handling

When a client disconnects, the server:

  1. Identifies all jobs owned by client
  2. Releases job locks (returns to queue)
  3. Cleans up client tracking

Jobs with active locks are automatically requeued for other workers.

Validation Limits

ParameterLimit
Queue nameMax 256 chars, alphanumeric + _-.:
Job dataMax 10 MB JSON
Priority-1,000,000 to +1,000,000
Delay0 to 365 days
Timeout0 to 24 hours
Max attempts1 to 1,000
Backoff0 to 24 hours
TTL0 to 365 days

HTTP Endpoints

bunqueue also exposes an HTTP API on port 6790:

EndpointMethodDescription
/healthGETHealth + memory stats
/healthzGETKubernetes liveness
/readyGETKubernetes readiness
/prometheusGETPrometheus metrics
/statsGETJSON statistics
/queues/:queue/jobsPOSTAdd job
/queues/:queue/jobsGETPull job
/jobs/:idGETGet job
/jobs/:id/ackPOSTAcknowledge
/jobs/:id/failPOSTFail
/wsGETWebSocket
/eventsGETServer-Sent Events