Skip to content

Persistence Layer

Persistence Layer

bunqueue uses SQLite with WAL mode for persistence, optimized for high-throughput job processing.

SQLite Configuration

┌─────────────────────────────────────────────────────────────┐
│ SQLITE PRAGMAS │
│ │
│ journal_mode = WAL │ Write-Ahead Logging │
│ synchronous = NORMAL │ Balanced safety/performance │
│ cache_size = -64000 │ 64MB in-memory cache │
│ temp_store = MEMORY │ In-memory temp tables │
│ mmap_size = 268435456 │ 256MB memory-mapped I/O │
│ page_size = 4096 │ 4KB pages │
└─────────────────────────────────────────────────────────────┘

Write Buffer Architecture

┌─────────────────────────────────────────────────────────────┐
│ WRITE BUFFER │
│ │
│ Job arrives │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Add to buffer │ │
│ │ (max 100 jobs) │ │
│ └────────┬────────┘ │
│ │ │
│ ┌─────┴─────────────┐ │
│ │ │ │
│ ▼ ▼ │
│ Buffer full (100) Timer fires (10ms) │
│ │ │ │
│ └─────────┬─────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ BATCH INSERT │ │
│ │ │ │
│ │ INSERT INTO jobs VALUES │ │
│ │ (...), (...), (...), ... │ │
│ │ │ │
│ │ • Prepared statement cache (1-100 rows) │ │
│ │ • Single transaction │ │
│ │ • 50-100x faster than individual │ │
│ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

Buffered vs Durable Mode

┌─────────────────────────────────────────────────────────────┐
│ WRITE MODES │
│ │
│ ┌────────────────────┬───────────────────────────────────┐│
│ │ BUFFERED (default)│ DURABLE (opt-in per job) ││
│ ├────────────────────┼───────────────────────────────────┤│
│ │ ~100k jobs/sec │ ~10k jobs/sec ││
│ │ Up to 10ms loss │ No data loss ││
│ │ Batched writes │ Immediate write ││
│ │ │ ││
│ │ Use for: │ Use for: ││
│ │ • Emails │ • Payments ││
│ │ • Notifications │ • Critical events ││
│ │ • Analytics │ • Financial transactions ││
│ └────────────────────┴───────────────────────────────────┘│
│ │
│ Usage: │
│ queue.add('job', data, { durable: true }) │
└─────────────────────────────────────────────────────────────┘

Database Schema

┌─────────────────────────────────────────────────────────────┐
│ TABLES │
│ │
│ jobs (23 columns) │
│ ├─ id: TEXT PRIMARY KEY (UUIDv7) │
│ ├─ queue: TEXT │
│ ├─ data: BLOB (MessagePack) │
│ ├─ priority: INTEGER │
│ ├─ state: TEXT │
│ ├─ run_at: INTEGER │
│ ├─ attempts: INTEGER │
│ └─ ... (20 more fields) │
│ │
│ Indexes: │
│ ├─ (queue, state) ──► PULL queries │
│ ├─ (run_at) ──► Delayed job scheduler │
│ ├─ (queue, unique_key) ──► Deduplication │
│ └─ (custom_id) ──► Idempotency │
│ │
│ job_results │
│ ├─ job_id: TEXT PRIMARY KEY │
│ ├─ result: BLOB (MessagePack) │
│ └─ completed_at: INTEGER │
│ │
│ dlq │
│ ├─ id: TEXT PRIMARY KEY │
│ ├─ job_id: TEXT │
│ ├─ queue: TEXT │
│ └─ entry: BLOB (full DlqEntry, MessagePack) │
│ │
│ cron_jobs │
│ ├─ name: TEXT PRIMARY KEY │
│ ├─ queue: TEXT │
│ ├─ schedule: TEXT │
│ ├─ next_run: INTEGER │
│ └─ executions: INTEGER │
└─────────────────────────────────────────────────────────────┘

Crash Recovery Flow

┌─────────────────────────────────────────────────────────────┐
│ STARTUP RECOVERY │
│ │
│ 1. Load pending jobs (state = waiting/delayed) │
│ └─ For each job: │
│ ├─ Push to appropriate shard queue │
│ └─ Update jobIndex │
│ │
│ 2. Load active jobs (state = active) │
│ └─ For each job: │
│ ├─ Reset to waiting (worker may have died) │
│ └─ Push back to queue │
│ │
│ 3. Load DLQ entries │
│ └─ Restore to in-memory DLQ shards │
│ │
│ 4. Load cron jobs │
│ └─ Populate cron scheduler heap │
└─────────────────────────────────────────────────────────────┘

S3 Backup Flow

┌─────────────────────────────────────────────────────────────┐
│ S3 BACKUP │
│ │
│ Scheduled: every 6 hours (configurable) │
│ │
│ BACKUP: │
│ ┌────────────────────────────────────────────────────────┐│
│ │ 1. Check database file exists ││
│ │ 2. Generate key: backups/bunqueue-{timestamp}.db ││
│ │ 3. Read file as ArrayBuffer ││
│ │ 4. Calculate SHA256 checksum ││
│ │ 5. Upload to S3 (application/x-sqlite3) ││
│ │ 6. Upload metadata.json ││
│ │ 7. Cleanup old backups (keep N most recent) ││
│ └────────────────────────────────────────────────────────┘│
│ │
│ RESTORE: │
│ ┌────────────────────────────────────────────────────────┐│
│ │ 1. Download backup file from S3 ││
│ │ 2. Load metadata.json ││
│ │ 3. Verify SHA256 checksum ││
│ │ 4. Write to database path ││
│ │ 5. Restart server to load ││
│ └────────────────────────────────────────────────────────┘│
│ │
│ Supports: AWS S3, Cloudflare R2, MinIO, DigitalOcean │
└─────────────────────────────────────────────────────────────┘

Flush on Failure

┌─────────────────────────────────────────────────────────────┐
│ ERROR RECOVERY │
│ │
│ flush() fails │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ Re-add jobs to buffer │ │
│ │ this.buffer = jobs.concat(this.buffer) │ │
│ │ │ │
│ │ Jobs not lost, will retry on next flush │ │
│ └─────────────────────────────────────────┘ │
│ │
│ On shutdown: │
│ ┌─────────────────────────────────────────┐ │
│ │ Final flush of remaining buffer │ │
│ │ Wait for completion before exit │ │
│ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

Serialization

┌─────────────────────────────────────────────────────────────┐
│ MESSAGEPACK │
│ │
│ Why MessagePack instead of JSON? │
│ • 2-3x faster encoding/decoding │
│ • Smaller payload size │
│ • Binary data support │
│ │
│ Used for: │
│ • Job data BLOB │
│ • DLQ entry BLOB │
│ • TCP protocol payloads │
│ • Job results storage │
└─────────────────────────────────────────────────────────────┘