Patterns, examples, and best practices for using RabbitMQ from Node.js and TypeScript via amqplib. Includes connection setup, publishing, consuming, exchange types (direct/fanout/topic), prefetch, dead-letter exchanges, confirm channels, reconnect strategy, and TypeScript typings. Use when the user asks: "RabbitMQ Node.js example", "amqplib TypeScript", "how to publish a message to RabbitMQ", "RabbitMQ consumer Node", "connect to RabbitMQ from Node.js", "AMQP queue example", "RabbitMQ exchange types", "dead letter queue Node", or any RabbitMQ / AMQP + Node.js related question.
This skill provides reference material, code snippets and simple helper
functions for interacting with a RabbitMQ broker from a Node.js or TypeScript
application. It assumes usage of the popular amqplib package, which ships with
complete TypeScript definitions.
Install the client library in your project:
npm install amqplib
# or yarn add amqplib
If you are authoring TypeScript code, the types are included implicitly, but
you can ensure the compiler picks them up by having "moduleResolution": "node" in your tsconfig.json.
import amqp, { Connection, Channel, Replies } from 'amqplib';
async function connect(uri = 'amqp://localhost'): Promise<{ conn: Connection; channel: Channel }> {
const conn = await amqp.connect(uri);
const channel = await conn.createChannel();
return { conn, channel };
}
Keep a single connection per process and create channels for logical units of work. Channels are lightweight and can be reopened after errors.
async function publish(channel: Channel, queue: string, msg: string) {
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from(msg), { persistent: true });
}
// exchange example
await channel.assertExchange('logs', 'fanout', { durable: false });
channel.publish('logs', '', Buffer.from('hello world'));
assertQueue/assertExchange to ensure the topology exists.async function consume(channel: Channel, queue: string, handler: (msg: string) => void) {
await channel.assertQueue(queue, { durable: true });
await channel.consume(queue, msg => {
if (msg) {
handler(msg.content.toString());
channel.ack(msg);
}
});
}
ack or nack messages; otherwise they will be re-queued when the
consumer connection closes.channel.prefetch(1) to process one message at a time and avoid
overloading consumers.RabbitMQ supports several exchange types:
| Type | Description |
|---|---|
| direct | routes by exact routing key |
| fanout | broadcasts to all bound queues |
| topic | routes by pattern (e.g. user.*.created) |
| headers | routes by message header values |
Binding queues to exchanges allows flexible routing; call
channel.bindQueue(queue, exchange, routingKey).
amqplib module exports interfaces such as Connection, Channel, and
Message you can import directly.zod or io-ts before publishing.interface TaskPayload { taskId: string; data: any; }
channel.sendToQueue('tasks', Buffer.from(JSON.stringify(payload)));
conn.close() on shutdown to
flush any outstanding confirms.error/close events on the connection and
attempt to reconnect with backoff.await - many methods return promises; neglecting them can lead
to lost messages or unexpected ordering.conn.createConfirmChannel() returns a channel that
provides publisher confirms (channel.waitForConfirms()). Use channel.waitForConfirms()
or the callback form of sendToQueue to know when the broker has accepted
a message.replyTo/correlationId properties. Keep a single reply queue per client
and correlate responses.deadLetterExchange to
handle unprocessable messages, optionally with TTL/expiration for retries.messageTtl or expiration on queues or
properties to auto-expire stale messages.channel.prefetch(n) to control how many unacked
messages are sent to each consumer; useful for batching and flow control.amqp://host1,host2.Many applications keep connection parameters in environment variables:
RABBITMQ_URL=amqp://user:pass@rabbitmq:5672/vhost
RABBITMQ_PREFETCH=5 # optional prefetch count
RABBITMQ_HEARTBEAT=30 # seconds
Pass them to amqp.connect(process.env.RABBITMQ_URL) and call
channel.prefetch(+process.env.RABBITMQ_PREFETCH || 0).
Handle error and close events on both connection and channel objects. A
simple backoff loop looks like:
async function start() {
while (true) {
try {
const { conn, channel } = await connect();
await consume(channel, 'tasks', handler);
break; // consumed successfully
} catch (err) {
console.error('rabbitmq error, retrying in 5s', err);
await new Promise(r => setTimeout(r, 5000));
}
}
}
Libraries like amqp-connection-manager wrap this pattern and automatically
reopen channels when the broker restarts.
(async () => {
const { conn, channel } = await connect();
await publish(channel, 'hello', 'world');
await consume(channel, 'hello', console.log);
// teardown...
await channel.close();
await conn.close();
})();