0%

RabbitMQ:One broker to queue them all

  1. What is RabbitMQ
  2. Environment Setup
  3. Managing Users and Permissions
  4. Scenario: Producer sends messages, Consumer receives messages
  5. Advanced Features: Ensure Reliable Message Delivery
  6. Best Practices

What is RabbitMQ?

RabbitMQ is an open source message middleware that implements the AMQP protocol(Advanced Message Queuing Protocol),designed for asynchronous communication in distributed systems.It facilitates asynchronous communication between applications, enabling decoupling and scalable architectures.It supports multiple programming languages and provides features like message persistence, flow control, and cluster deployment, widely used in microservices architecture, asynchronous task processing, and system decoupling.

Core Concepts

Concept Description
Producer An application that sends messages to an exchange.
Consumer An application that receives messages from a queue and processes them.
Exchange A component that determines message routing rules and distributes messages to queues based on bindings.
Queue A container for storing messages, from which consumers pull messages.
Binding Establishes a relationship between an exchange and a queue, defining routing conditions (e.g., routing keys).
AMQP An application-layer protocol that defines communication standards for reliable message passing.

pic

Environment Setup

Pulling the RabbitMQ Docker Imag

1
docker pull rabbitmq:management

Start a RabbitMQ instance and mount a custom configuration file from your host to the container.

1
2
3
4
5
6
docker run -id --name=rabbitmq -v rabbitmq-home:/var/lib/rabbitmq 
-p 15672:15672
-p 5672:5672
-e RABBITMQ_DEFAULT_USER=***
-e RABBITMQ_DEFAULT_PASS=***
rabbitmq:management
  • -d runs the container in detached mode.
  • –name assigns a name to the container.
  • -p maps the ports from the container to your host machine. Port 5672 is for RabbitMQ server, and 15672 is for the management UI.
    Access the RabbitMQ Management Console:
  • Open a web browser and navigate to http://localhost:15672/.
    pic
  • Log in with the default username you set and password you set in the above command.

Managing Users and Permissions

To add a user, use rabbitmqctl add_user.
pic

To grant permissions to a user in a virtual host, use rabbitmqctl set_permissions:
pic
To list users in a cluster, use rabbitmqctl list_users:

1
rabbitmqctl list_users

To delete a user, use rabbitmqctl delete_user:

1
rabbitmqctl delete_user 'username'

To revoke permissions from a user in a virtual host, use rabbitmqctl clear_permissions:

1
rabbitmqctl clear_permissions -p "custom-vhost" "username"

Scenario: Producer sends messages, Consumer receives messages

Producer Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Producer
{
private const string QueueName = "simple_queue";

public static void SendMessage(string message)
{
var factory = new ConnectionFactory() {
HostName = "***",
VirtualHost = "visual_knitting",
UserName = "admin",
Password = "123456",
Port = 5672
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(
queue: QueueName,
durable: false, // Store in memory (non-persistent)
exclusive: false, // Non-exclusive queue
autoDelete: false, // Do not auto-delete when empty
arguments: null
);

var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
exchange: "", // Use default exchange (direct to queue)
routingKey: QueueName, // Routing key matches queue name
basicProperties: null,
body: body
);
Console.WriteLine($"[Producer] Sent: {message}");
}
}
}

Consumer Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class Consumer
{
private const string QueueName = "simple_queue";

public static void StartListening()
{
var factory = new ConnectionFactory() {

var factory = new ConnectionFactory() {
HostName = "***",
VirtualHost = "visual_knitting",
UserName = "admin",
Password = "123456",
Port = 5672
};
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(QueueName, false, false, false, null);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"[Consumer] Received: {message}");
};

channel.BasicConsume(
queue: QueueName,
autoAck: true, // Auto-acknowledge messages upon receipt
consumer: consumer
);

Console.WriteLine("Consumer is waiting for messages. Press any key to exit.");
Console.ReadKey();
}
}
}

Run the test:

Send a message from the Producer:
pic
Consumer output:
pic

Advanced Features: Ensure Reliable Message Delivery

Message Persistence

Scenario: Prevent message loss on RabbitMQ restart

  • Queue Persistence: Set durable: true when declaring the queue.
  • Message Persistence: Set BasicProperties.Persistent = true.
    1
    2
    3
    4
    5
    6
    // Producer code for persistence
    channel.QueueDeclare(QueueName, durable: true, ...);

    var properties = channel.CreateBasicProperties();
    properties.Persistent = true; // Make messages persistent
    channel.BasicPublish(..., basicProperties: properties, ...);

Publisher Confirms

Scenario: Ensure messages reach RabbitMQ server successfully

1
2
3
4
5
6
7
8
9
10
11
12
// Enable publisher confirms
channel.ConfirmSelect();

channel.BasicPublish(...);
if (channel.WaitForConfirms(TimeSpan.FromSeconds(5)))
{
Console.WriteLine("Message confirmed by server.");
}
else
{
Console.WriteLine("Message confirmation timed out.");
}

Manual Consumer Acknowledgment

Scenario: Avoid losing unprocessed messages (e.g., on consumer crash)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Consumer code with manual acknowledgment (autoAck: false)
channel.BasicConsume(
queue: QueueName,
autoAck: false, // Disable auto-acknowledgment
consumer: consumer
);

consumer.Received += (model, ea) =>
{
var deliveryTag = ea.DeliveryTag;
try
{
// Process the message...
channel.BasicAck(deliveryTag, multiple: false); // Acknowledge single message
}
catch (Exception)
{
channel.BasicNack(deliveryTag, multiple: false, requeue: true); // Requeue the message
}
};

Best Practices

Message Format

Use JSON for structured data, e.g:

1
2
3
4
5
{
"MessageId": "12**",
"Content": "Order created",
"Timestamp": "2025-03-22T12:00:00"
}

Monitoring

  • Use the RabbitMQ management dashboard to check queue status, connections, and resource usage.
  • Monitor performance with dotnet trace or tools like Prometheus + Grafana.