Message Queue

Overview

The platform uses BullMQ with a Redis backend for asynchronous job processing. The current primary use case is email delivery – rather than sending emails synchronously during request handling, the application enqueues email jobs that are processed by a dedicated worker process. This decouples email delivery from the API request lifecycle, improving response times and reliability.

Architecture

The message queue system consists of two components:

API Services                    Redis                    Worker Process
  |                               |                           |
  |  EmailClient.send_email_later |                           |
  |  -> MessageQueue.addJob()     |                           |
  |------------------------------>|                           |
  |         (enqueue)             |                           |
  |                               |    Worker polls queue     |
  |                               |<--------------------------|
  |                               |                           |
  |                               |    Job delivered          |
  |                               |-------------------------->|
  |                               |                           |
  |                               |    EmailClient.send_email_now()
  |                               |    -> Resend API          |
  • Producer: libs/message-queue/MessageQueue.ts – a singleton that wraps a BullMQ Queue instance named email-queue.
  • Consumer: libs/message-queue/run.ts – a BullMQ Worker listening on email-queue.
  • Startup: The worker runs as a separate process, started via npm run message-queue.

Producer

Singleton Pattern

The MessageQueue class follows the singleton pattern, ensuring a single shared queue connection across the application. All services that need to enqueue jobs import and use the same instance.

Redis Configuration

The producer’s Redis connection is configured with resilient defaults:

  • Retry strategy: Exponential backoff, capped at a maximum of 30 seconds between retries.
  • Reconnection: Automatically reconnects on READONLY, ECONNREFUSED, and EPIPE errors.
  • Offline queue: Enabled. Commands issued while disconnected are buffered and replayed once the connection is restored.

Methods

addJob({ label, data, opts? })

Enqueues a new job on the email-queue. The data parameter (an object) is serialized to a JSON string before being stored. An optional opts parameter allows BullMQ job options (such as delay, attempts, or priority) to be passed through.

getJob(jobId)

Retrieves a job by its ID from the queue. Useful for checking job status or inspecting job data after enqueueing.

Consumer

Initialization

The worker process begins by importing instrumentation.ts to initialize OpenTelemetry tracing and log export before any other modules are loaded. This ensures all downstream operations are captured by the observability pipeline.

Redis Configuration

The consumer uses a stricter Redis configuration than the producer:

  • Connect timeout: 10 seconds.
  • Max retries: 10 attempts. If all retries are exhausted, the worker gives up rather than retrying indefinitely.
  • Offline queue: Disabled. Commands are not buffered when disconnected, preventing stale job processing.

Job Processor

When a job is dequeued, the processor executes the following steps:

  1. Parses job.data, handling both string and object formats for compatibility.
  2. Checks for a body property within the parsed data.
  3. Calls EmailClient.send_email_now(body, to) to dispatch the email immediately via the Resend API.

Event Handlers

The worker registers handlers for the following events:

  • ready: Logged when the worker successfully connects to Redis and begins processing.
  • error: Logged when an error occurs. Connection-specific errors are distinguished from general errors and include retry context information.
  • failed: Logged when a job fails after exhausting its retry attempts. The log entry includes the job ID for debugging.

Graceful Shutdown

The worker listens for SIGTERM and SIGINT signals. On receipt, it closes the worker connection cleanly, allowing in-progress jobs to complete before the process exits.

Email Sending

The EmailClient class provides two methods for sending email:

send_email_later(emailData)

Adds an email job to the BullMQ queue for asynchronous processing. This is the preferred method for non-urgent emails (such as OTP delivery), as it returns immediately and does not block the calling request handler.

send_email_now(body, to)

Sends an email synchronously via the Resend API. This method is called by the queue worker when processing jobs, and can also be used directly when immediate delivery is required.

Email Templates

Email templates are built using the React Email library (@react-email/components). Templates are defined as React components, allowing type-safe, composable email layouts that render to standards-compliant HTML.

Error Handling

The message queue system employs layered error handling:

  • Connection errors are logged with retry count and timing information, allowing operators to distinguish between transient network issues and persistent connectivity failures.
  • Job failures are logged with the associated job ID, enabling correlation with the original enqueue operation and any upstream request context.
  • Producer resilience: The offline queue ensures that jobs enqueued during brief Redis outages are not lost.
  • Consumer resilience: The strict retry cap prevents the worker from entering an infinite reconnection loop against an unreachable Redis instance.

Code Locations

Component Path
Producer (Queue) libs/message-queue/MessageQueue.ts
Consumer (Worker) libs/message-queue/run.ts
Email client libs/message-queue/EmailClient.ts
Email templates libs/message-queue/templates/