Application Layer Architecture
Application Layer Architecture
The application layer orchestrates all queue operations, coordinating between the client layer and domain layer.
Module Structure
src/application/├── queueManager.ts # Central orchestrator├── operations/ # PUSH, PULL, ACK, Query├── dlqManager.ts # Dead letter queue├── eventsManager.ts # Event pub/sub├── workerManager.ts # Worker tracking├── backgroundTasks.ts # Task orchestration├── stallDetection.ts # Stall detection└── dependencyProcessor.ts # Dependency resolutionQueueManager Orchestration
┌─────────────────────────────────────────────────────────────┐│ QUEUE MANAGER ││ ││ ┌────────────────────────────────────────────────────────┐ ││ │ STATE │ ││ │ │ ││ │ shards[N] ◄──► shardLocks[N] (N = auto-detected) │ ││ │ processingShards[N] ◄──► processingLocks[N] │ ││ │ │ ││ │ jobIndex: Map<id, location> │ ││ │ completedJobs: BoundedSet (50k) │ ││ │ jobResults: LRU (5k) │ ││ │ customIdMap: LRU (50k) │ ││ └────────────────────────────────────────────────────────┘ ││ ││ ┌────────────────────────────────────────────────────────┐ ││ │ OPERATIONS │ ││ │ │ ││ │ push() ──► operations/push.ts │ ││ │ pull() ──► operations/pull.ts │ ││ │ ack() ──► operations/ack.ts │ ││ │ query ──► operations/queryOperations.ts │ ││ └────────────────────────────────────────────────────────┘ ││ ││ ┌────────────────────────────────────────────────────────┐ ││ │ MANAGERS │ ││ │ │ ││ │ DLQManager │ EventsManager │ WorkerManager │ ││ │ WebhookManager │ StatsManager │ JobLogsManager │ ││ └────────────────────────────────────────────────────────┘ ││ ││ ┌────────────────────────────────────────────────────────┐ ││ │ BACKGROUND TASKS │ ││ │ │ ││ │ cleanup │ stall │ dependency │ dlq │ cron │ ││ └────────────────────────────────────────────────────────┘ │└─────────────────────────────────────────────────────────────┘PUSH Operation Flow
push(queue, input) │ ▼┌─────────────────────────────────────────────────────────────┐│ 1. Generate UUIDv7 ID ││ 2. Check customId idempotency (customIdMap) ││ └─ If exists: return existing job │└─────────────┬───────────────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────┐│ 3. Acquire shard write lock ││ shardIdx = fnv1aHash(queue) & SHARD_MASK │└─────────────┬───────────────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────┐│ 4. Check unique key deduplication ││ │ ││ ├─ Key available? ──► Register key, continue ││ └─ Key exists? ││ ├─ strategy: replace ──► Remove old, insert new ││ ├─ strategy: extend ──► Reset TTL, return existing ││ └─ default ──► Return existing │└─────────────┬───────────────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────┐│ 5. Check dependencies ││ │ ││ ├─ All satisfied? ──► Push to queue ││ └─ Not satisfied? ──► Add to waitingDeps ││ Register in dependencyIndex │└─────────────┬───────────────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────┐│ 6. Update jobIndex ││ 7. Persist to SQLite (buffered or durable) ││ 8. Notify waiters (wake long poll) ││ 9. Broadcast 'pushed' event │└─────────────────────────────────────────────────────────────┘PULL Operation Flow
pull(queue, timeoutMs) │ ▼┌─────────────────────────────────────────────────────────────┐│ LOOP: ││ ││ 1. Acquire shard read lock ││ 2. Check queue paused? ──► return null ││ 3. Check rate limit ──► return null if exceeded ││ 4. Check concurrency ──► return null if at limit │└─────────────┬───────────────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────┐│ 5. Pop from priority queue ││ │ ││ ├─ TTL expired? ──► Skip, try next ││ ├─ Not ready (delayed)? ──► Skip, try next ││ ├─ FIFO group active? ──► Skip, try next ││ └─ Valid job ──► Continue │└─────────────┬───────────────────────────────────────────────┘ │ ┌────────┴────────┐ │ │ Job found No job │ │ ▼ ▼┌─────────────┐ ┌─────────────────────────┐│ Move to │ │ Wait for notification ││ processing │ │ (event-based, timeout) ││ shard │ └──────────┬──────────────┘└──────┬──────┘ │ │ └───► Retry loop ▼┌─────────────────────────────────────────────────────────────┐│ 6. Create lock token (if useLocks enabled) ││ 7. Update jobIndex to 'processing' ││ 8. Mark active in SQLite ││ 9. Broadcast 'pulled' event ││ 10. Return job with token │└─────────────────────────────────────────────────────────────┘ACK Operation Flow
ack(jobId, result, token) │ ▼┌─────────────────────────────────────────────────────────────┐│ 1. Verify lock token (if provided) ││ └─ Mismatch? ──► Error: token invalid │└─────────────┬───────────────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────┐│ 2. Remove from processing shard ││ procIdx = fnv1aHash(jobId) & SHARD_MASK │└─────────────┬───────────────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────┐│ 3. Release shard resources ││ ├─ Release unique key ││ ├─ Release FIFO group ││ └─ Release concurrency slot │└─────────────┬───────────────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────┐│ 4. Finalize ││ ├─ Store result in jobResults (LRU) ││ ├─ Store result in SQLite ││ ├─ Update jobIndex to 'completed' ││ └─ Add to completedJobs (signals deps) │└─────────────┬───────────────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────┐│ 5. Add to pendingDepChecks (wake dependents) ││ 6. Broadcast 'completed' event ││ 7. Trigger webhooks │└─────────────────────────────────────────────────────────────┘Background Tasks
┌─────────────────────────────────────────────────────────────┐│ BACKGROUND TASK SCHEDULER ││ ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │ Cleanup │ │ Stall Check │ │ Dependency │ ││ │ every 10s │ │ every 5s │ │ every 100ms │ ││ └──────────────┘ └──────────────┘ └──────────────┘ ││ ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │ DLQ Maint │ │ Lock Expire │ │ Cron │ ││ │ every 60s │ │ every 5s │ │ every 1s │ ││ └──────────────┘ └──────────────┘ └──────────────┘ ││ ││ Circuit breaker: After 5 consecutive failures, ││ log CRITICAL warning but continue retrying │└─────────────────────────────────────────────────────────────┘Stall Detection (Two-Phase)
┌─────────────────────────────────────────────────────────────┐│ STALL CHECK (every 5s) ││ ││ PHASE 1: Process previous candidates ││ ┌────────────────────────────────────────────────────────┐ ││ │ For each job in stalledCandidates: │ ││ │ ├─ Still in processing? │ ││ │ ├─ Get stall config │ ││ │ └─ If confirmed stalled: │ ││ │ ├─ stallCount < maxStalls ──► Increment + retry │ ││ │ └─ stallCount >= maxStalls ──► Move to DLQ │ ││ └────────────────────────────────────────────────────────┘ ││ ││ PHASE 2: Mark new candidates ││ ┌────────────────────────────────────────────────────────┐ ││ │ For each job in processingShards: │ ││ │ ├─ No heartbeat for > stallInterval (30s)? │ ││ │ └─ Add to stalledCandidates (check next tick) │ ││ └────────────────────────────────────────────────────────┘ ││ ││ Why two-phase? Prevents false positives from ││ transient delays (GC pause, network hiccup) │└─────────────────────────────────────────────────────────────┘Dependency Resolution
┌─────────────────────────────────────────────────────────────┐│ DEPENDENCY PROCESSOR (every 100ms) ││ ││ 1. Collect completedIds from pendingDepChecks ││ (Set of jobs that completed since last check) ││ ││ 2. For each completedId: ││ └─ Look up dependencyIndex[completedId] ││ └─ Returns: Set<jobIds waiting for this job> ││ ││ 3. Group by shard for efficient locking ││ ││ 4. For each waiting job: ││ └─ Check: ALL dependencies completed? ││ └─ completedJobs.has(depId) for all deps ││ ││ 5. If all satisfied: ││ ├─ Remove from waitingDeps ││ ├─ Unregister from dependencyIndex ││ └─ Push to active queue │└─────────────────────────────────────────────────────────────┘Cleanup Tasks
┌─────────────────────────────────────────────────────────────┐│ CLEANUP (every 10s) ││ ││ 1. Refresh delayed counts in each shard ││ ││ 2. Compact priority queues ││ └─ If stale ratio > 20%: rebuild heap ││ ││ 3. Clean orphaned processing entries ││ └─ Jobs stuck > 30min with no heartbeat ││ ││ 4. Clean stale waiting dependencies ││ └─ Waiting > 1 hour ││ ││ 5. Clean expired unique keys ││ ││ 6. Clean orphaned job index entries ││ ││ 7. Remove empty queues │└─────────────────────────────────────────────────────────────┘Event Broadcasting
┌─────────────────────────────────────────────────────────────┐│ EVENTS MANAGER ││ ││ Event occurs (completed, failed, progress, stalled) ││ │ ││ ▼ ││ broadcast(event) ││ │ ││ ├──► Notify all subscribers (Set-based, O(1) add) ││ ├──► Trigger matching webhooks ││ └──► Wake completion waiters ││ ││ Event-based waiting (no polling): ││ waitForJobCompletion(jobId, timeout) ││ └─ Resolved when 'completed' event for jobId │└─────────────────────────────────────────────────────────────┘