IoT & Edge: MQTT to Job Queue on a Gateway
bunqueue fits where a Redis + BullMQ stack does not: a single Bun process with one SQLite file, running on an edge gateway next to your sensors. This guide covers the recommended IoT architecture, the MQTT bridge pattern, offline buffering, and secure forwarding to a central server with native TLS.
Where bunqueue fits (and where it doesn’t)
Section titled “Where bunqueue fits (and where it doesn’t)”| Scenario | Fit |
|---|---|
| Edge gateway (Raspberry Pi 4/5, Jetson, ARM64/x64 mini-PC) | ✅ embedded queue, store-and-forward |
| Backend telemetry ingestion (absorb bursts, retry, DLQ) | ✅ |
| Offline-first buffering (flaky uplink) | ✅ SQLite WAL persistence |
| Replacement for an MQTT broker | ❌ use Mosquitto/EMQX, bridge into bunqueue |
| Directly on microcontrollers (ESP32, ARMv7 32-bit) | ❌ Bun requires ARM64/x64 — devices talk to the gateway |
The pattern that works:
sensors ──MQTT──► broker (Mosquitto/EMQX) ──► bridge ──► bunqueue ──► Worker (SQLite) │ ▼ backend / TSDB / alertsDevices keep speaking MQTT (their native protocol). The bridge — a ~30 line Bun script — subscribes to topics and turns each message into a persisted job. From there you get everything a queue gives you that a broker does not: retries with backoff, dead letter queue, priorities, delayed jobs, cron aggregations, and a durable buffer when the uplink is down.
The MQTT bridge
Section titled “The MQTT bridge”Full runnable version in
examples/mqtt-bridge/.
The core is this:
import mqtt from 'mqtt';import { Queue, Worker } from 'bunqueue/client';
// Embedded queue: no server process, persisted to SQLite on the gatewayconst queue = new Queue('telemetry', { embedded: true, dataPath: './edge-queue.db',});
const client = mqtt.connect('mqtt://localhost:1883');client.on('connect', () => client.subscribe('sensors/#'));
client.on('message', (topic, payload) => { void queue.add( 'reading', { topic, payload: JSON.parse(payload.toString()), receivedAt: Date.now() }, { attempts: 5 } );});
// Process locally, or forward to your backendconst worker = new Worker( 'telemetry', async (job) => { // POST to backend, write to TSDB, trigger alerts... return { processed: true }; }, { embedded: true, dataPath: './edge-queue.db', concurrency: 10 });Run it:
bun add mqttMQTT_URL=mqtt://localhost:1883 bun examples/mqtt-bridge/index.ts
# publish a test readingmosquitto_pub -t sensors/temp/room1 -m '{"temp":21.5}'Offline buffering
Section titled “Offline buffering”The embedded queue writes to SQLite (WAL mode) on the gateway. If the worker’s
forwarding target is unreachable, jobs fail and are retried with exponential
backoff; after attempts exhausted they land in the DLQ instead of being
lost. When connectivity returns, queue.retryDlq() re-enqueues them.
For readings you cannot afford to lose even across a power cut in the 10ms write-buffer window, use durable mode per job:
await queue.add('critical-alarm', data, { durable: true }); // immediate fsyncThroughput trade-off: buffered ~100k jobs/sec, durable ~10k jobs/sec — both far beyond typical sensor rates.
Forwarding to a central server (TLS)
Section titled “Forwarding to a central server (TLS)”To process centrally instead of on the gateway, point the queue at a remote bunqueue over TCP with native TLS:
const queue = new Queue('telemetry', { connection: { host: 'queue.example.com', port: 6789, tls: true, // or { caFile: './ca.pem' } for a private CA token: Bun.env.BQ_TOKEN, },});Central server:
bunqueue start \ --tls-cert /etc/bunqueue/cert.pem \ --tls-key /etc/bunqueue/key.pem \ --auth-tokens "$TOKEN" \ --data-path /var/lib/bunqueue/queue.dbBuilt-in store-and-forward: queue.forward()
Section titled “Built-in store-and-forward: queue.forward()”The recommended hybrid — embedded queue on the gateway as the offline buffer, drained to the central server when the uplink is healthy — is a one-liner:
const local = new Queue('telemetry', { embedded: true, dataPath: './edge.db' });
const forwarder = local.forward({ to: { host: 'queue.example.com', port: 6789, tls: true, token: Bun.env.BQ_TOKEN }, queue: 'telemetry-ingest', // optional remote name (default: same) concurrency: 4,});
forwarder.on('forwarded', ({ id, remoteId }) => console.log(`→ ${id} as ${remoteId}`));forwarder.on('error', (err) => console.error('uplink:', err.message));
// later: await forwarder.close();Semantics:
- Nothing lost offline: a remote failure fails the job locally → local
retry with backoff → local DLQ after
attempts. When the uplink returns,local.retryDlq()re-enqueues buffered readings. - Dedup on re-forward: each forwarded job carries the deterministic
remote jobId
fwd:<queue>:<localId>; the server dedupes custom jobIds, so a re-forward after a crash or retry is idempotent within the server’s retention window (custom-id map is a bounded LRU, andremoveOnCompleteon the remote evicts the entry). For strict exactly-once across long outages, keepremoveOnComplete: falseremotely or dedupe downstream. - Priority is preserved; pass
durable: trueto fsync each job server-side.
Cron aggregations on the gateway
Section titled “Cron aggregations on the gateway”Downsample locally before forwarding — cheaper uplink, less central load:
await queue.upsertJobScheduler('aggregate-5m', { every: 5 * 60 * 1000 }, { name: 'aggregate', data: { window: '5m' },});Hardware notes
Section titled “Hardware notes”- Runtime: Bun runs on Linux/macOS ARM64 and x64. Raspberry Pi 4/5 with a 64-bit OS works; 32-bit ARMv7 boards (Pi Zero/2, most ESP-class hardware) do not — those devices publish MQTT to the gateway instead.
- Footprint: single process, no Redis container. SQLite file size is the
main disk consideration — bound it with
removeOnComplete, DLQmaxAge/maxEntries, and periodicqueue.clean(graceMs, limit). - Reliability: enable S3 backup on gateways with object storage access, or ship the SQLite file with your own sync.
See also
Section titled “See also”- Native TLS — cert setup, client options, self-signed certs
examples/mqtt-bridge/— runnable bridge- Stall Detection and DLQ — what happens to stuck/poison readings