Introduction Link to heading
Message queues are essential components in modern distributed systems, enabling asynchronous communication between services. RabbitMQ is one of the most popular message brokers, providing reliable message delivery and supporting multiple messaging patterns.
RabbitMQ is an open-source message broker that implements the Advanced Message Queuing Protocol (AMQP). It acts as an intermediary between applications, allowing them to communicate asynchronously by sending and receiving messages through queues.
In this post, we will explore RabbitMQ basics by building a simple job processing system with a publisher and consumer.
Folder Structure Link to heading
rabbitmq/
├── package.json
├── publisher.js
├── consumer.js
The project uses the amqplib library, which is the official RabbitMQ client for node.js:
// package.json
{
"name": "rabbitMQ",
"version": "1.0.0",
"dependencies": {
"amqplib": "^0.10.3"
},
"scripts": {
"publish": "node publisher.js",
"consume": "node consumer.js"
}
}
The publisher is responsible for sending messages to the queue. Here’s how it works:
// publisher.js
const amqp = require("amqplib");
const msg = {number: process.argv[2]}
connect();
async function connect() {
let connection;
try {
connection = await amqp.connect("amqp://localhost:5672", {
frameMax: 0,
heartbeat: 60
});
const channel = await connection.createChannel();
const result = await channel.assertQueue("jobs");
channel.sendToQueue("jobs", Buffer.from(JSON.stringify(msg)))
console.log(`Job sent successfully ${msg.number}`);
await channel.close();
await connection.close();
}
catch (ex){
console.error(ex)
}
}
Publisher Workflow:
- Connection: Establishes a TCP connection to RabbitMQ
- Channel: A virtual connection that handles the actual communication
- Queue Assertion: Ensures the queue exists (creates it if necessary)
- Message Sending: Converts the message to a Buffer and sends it to the queue
The consumer processes messages from the queue:
// consumer.js
const amqp = require("amqplib");
connect();
async function connect() {
let connection;
try {
connection = await amqp.connect("amqp://localhost:5672", {
frameMax: 0,
heartbeat: 60
});
const channel = await connection.createChannel();
const result = await channel.assertQueue("jobs");
channel.consume("jobs", async message => {
const input = JSON.parse(message.content.toString());
console.log(`Received job with input ${input.number}`)
if (input.number == 7) {
channel.ack(message);
console.log(`Acknowledged message with input ${input.number}`)
}
})
console.log("Waiting for messages...")
// Handle graceful shutdown
process.on('SIGINT', async () => {
console.log('Closing connection...');
await channel.close();
await connection.close();
process.exit(0);
});
}
catch (ex){
console.error(ex)
}
}
Consumer Workflow:
- Message Consumption: Listens for messages from the “jobs” queue
- Message Parsing: Converts the Buffer back to JSON
- Message Acknowledgment: Only acknowledges messages that meet certain criteria
- Error Handling: Catches and logs any connection or processing errors
Running the Project Link to heading
- Start RabbitMQ
docker run --rm --name rabbitmq -p 5672:5672 rabbitmq:3.13.7-alpine
- Install Dependencies
npm install
- Start the Consumer (in one terminal)
npm run consume
- Send Messages (in another terminal)
npm run publish 5
npm run publish 7
npm run publish 10
- Check the console output of the consumer. We will see the message with number 7 being acknowledged.
npm run consume
> rabbitMQ@1.0.0 consume
> node consumer.js
Waiting for messages...
Received job with input 5
Received job with input 7
Acknowledged message with input 7
Received job with input 10
Understanding Message Acknowledgment Link to heading
In the consumer code, there’s an interesting condition:
if (input.number == 7) {
channel.ack(message);
}
This means only messages with number: 7 will be acknowledged. Unacknowledged messages remain in the queue.
If we restart the consumer process, the unacknowledged messages in the queue will be requeued and the consumer will start processing them again.
npm run consume
> rabbitMQ@1.0.0 consume
> node consumer.js
Waiting for messages...
Received job with input 5
Received job with input 10
Real-World Applications Link to heading
This simple example demonstrates patterns used in real-world scenarios:
- Job Processing: Background task execution
- Microservices Communication: Service-to-service messaging
- Event-Driven Architecture: Decoupled system components
- Load Balancing: Multiple consumers processing messages
Conclusion Link to heading
RabbitMQ provides a robust foundation for building distributed systems with reliable message delivery. This simple publisher-consumer pattern demonstrates the core concepts that power many modern applications.
The key takeaway is that message queues enable loose coupling between services, allowing them to communicate asynchronously and handle failures gracefully.