bunqueue with Hono and Elysia: Bun Framework Integration Guide
bunqueue is designed for the Bun ecosystem, and Hono and Elysia are the most popular web frameworks for Bun. Here’s how to integrate them cleanly.
Hono Integration
Section titled “Hono Integration”Basic Setup
Section titled “Basic Setup”import { Hono } from 'hono';import { Queue, Worker } from 'bunqueue/client';
// Create queue and workerconst emailQueue = new Queue<{ to: string; subject: string }>('emails', { embedded: true,});
const worker = new Worker('emails', async (job) => { await sendEmail(job.data.to, job.data.subject); return { sent: true };}, { embedded: true, concurrency: 5 });
// Create Hono appconst app = new Hono();
app.post('/api/send-email', async (c) => { const { to, subject } = await c.req.json();
const job = await emailQueue.add('send', { to, subject });
return c.json({ jobId: job.id, status: 'queued' });});
app.get('/api/job/:id', async (c) => { const job = await emailQueue.getJob(c.req.param('id'));
if (!job) return c.json({ error: 'Not found' }, 404);
return c.json({ id: job.id, state: await job.getState(), progress: job.progress, data: job.data, });});
export default app;Middleware Pattern
Section titled “Middleware Pattern”Create a reusable middleware that injects the queue into the context:
import { createMiddleware } from 'hono/factory';import { Queue } from 'bunqueue/client';
// Queue factory middlewareexport function withQueue<T>(name: string) { const queue = new Queue<T>(name, { embedded: true });
return createMiddleware(async (c, next) => { c.set('queue', queue); await next(); });}
// Usageconst app = new Hono();
app.use('/api/orders/*', withQueue<OrderData>('orders'));
app.post('/api/orders', async (c) => { const queue = c.get('queue') as Queue<OrderData>; const order = await c.req.json();
const job = await queue.add('process', order, { priority: order.express ? 10 : 1, attempts: 3, });
return c.json({ orderId: job.id });});Elysia Integration
Section titled “Elysia Integration”Basic Setup
Section titled “Basic Setup”import { Elysia } from 'elysia';import { Queue, Worker } from 'bunqueue/client';
const taskQueue = new Queue<{ type: string; payload: unknown }>('tasks', { embedded: true,});
const worker = new Worker('tasks', async (job) => { switch (job.data.type) { case 'resize-image': return await resizeImage(job.data.payload); case 'generate-pdf': return await generatePdf(job.data.payload); default: throw new Error(`Unknown task type: ${job.data.type}`); }}, { embedded: true, concurrency: 3 });
const app = new Elysia() .post('/tasks', async ({ body }) => { const job = await taskQueue.add(body.type, body); return { jobId: job.id, status: 'queued' }; }) .get('/tasks/:id', async ({ params }) => { const job = await taskQueue.getJob(params.id); if (!job) return { error: 'Not found' }; return { id: job.id, state: await job.getState(), progress: job.progress, }; }) .get('/tasks/counts', async () => { return await taskQueue.getJobCountsAsync(); }) .listen(3000);Plugin Pattern
Section titled “Plugin Pattern”Encapsulate the queue as an Elysia plugin:
import { Elysia } from 'elysia';import { Queue, Worker } from 'bunqueue/client';
export function queuePlugin<T>(name: string) { const queue = new Queue<T>(name, { embedded: true });
return new Elysia({ name: `queue-${name}` }) .decorate('queue', queue) .get(`/queues/${name}/counts`, async ({ queue }) => { return await queue.getJobCountsAsync(); }) .get(`/queues/${name}/health`, async ({ queue }) => { const counts = await queue.getJobCountsAsync(); return { name, ...counts, healthy: counts.active < 1000, }; });}
// Usageconst app = new Elysia() .use(queuePlugin<EmailData>('emails')) .use(queuePlugin<OrderData>('orders')) .listen(3000);Graceful Shutdown
Section titled “Graceful Shutdown”Both frameworks need proper shutdown handling to ensure jobs are completed:
const queue = new Queue('tasks', { embedded: true });const worker = new Worker('tasks', processor, { embedded: true, concurrency: 5,});
// Handle shutdown signalsasync function gracefulShutdown() { console.log('Shutting down...');
// 1. Stop accepting new HTTP requests server.stop();
// 2. Close worker (finishes active jobs) await worker.close();
// 3. Disconnect queue (flushes buffers) await queue.disconnect();
process.exit(0);}
process.on('SIGTERM', gracefulShutdown);process.on('SIGINT', gracefulShutdown);TCP Mode for Microservices
Section titled “TCP Mode for Microservices”If your web app and workers run as separate processes, use TCP mode:
// Web server (process 1)const queue = new Queue('tasks', { connection: { host: 'bunqueue-server', port: 6789 },});
// Worker (process 2)const worker = new Worker('tasks', processor, { connection: { host: 'bunqueue-server', port: 6789 }, concurrency: 10,});This separates concerns: the web server only adds jobs, and dedicated worker processes handle processing. bunqueue’s connection pooling and auto-batching make this efficient even under high load.
Queue Groups for Multi-Queue Apps
Section titled “Queue Groups for Multi-Queue Apps”Organize related queues with QueueGroup:
import { QueueGroup } from 'bunqueue/client';
const billing = new QueueGroup('billing');
const invoices = billing.getQueue<InvoiceData>('invoices');const payments = billing.getQueue<PaymentData>('payments');const refunds = billing.getQueue<RefundData>('refunds');
// Queue names are prefixed: "billing:invoices", "billing:payments", etc.
// Pause all billing queues at once (e.g., during maintenance)billing.pauseAll();billing.resumeAll();