RabbitMQ Message Broker Setup
Deploy and configure RabbitMQ for reliable message queue handling
What is RabbitMQ?
RabbitMQ is an open-source message broker that implements the Advanced Message Queuing Protocol (AMQP). Key features:
- Message queuing: Decouple application components with asynchronous messaging
- Routing flexibility: Direct, fanout, topic, and headers exchange types
- Reliability: Message persistence and acknowledgment mechanisms
- High availability: Clustering and mirroring for fault tolerance
- Management UI: Built-in web dashboard for monitoring and administration
- Flexible delivery: Supports multiple consumer patterns and priority queues
Installation on Ubuntu/Debian
Add RabbitMQ Repository
# Install dependencies
apt-get install curl gnupg apt-transport-https
# Add RabbitMQ GPG key
curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/3.13.0/rabbitmq-release-signing-key.asc | apt-key add -
# Add RabbitMQ repository
echo "deb https://ppa.launchpadcontent.net/rabbitmq/rabbitmq-erlang/ubuntu $(lsb_release -sc) main" | tee /etc/apt/sources.list.d/rabbitmq.list
echo "deb https://ppa.launchpadcontent.net/rabbitmq/rabbitmq-erlang-backports/ubuntu $(lsb_release -sc) main" | tee -a /etc/apt/sources.list.d/rabbitmq.list
echo "deb https://rabbitmq.com/debian $(lsb_release -sc) main" | tee /etc/apt/sources.list.d/rabbitmq-team.list
apt-get updateInstall RabbitMQ and Erlang
# Install Erlang (required runtime)
apt-get install -y erlang-base erlang-asn1 erlang-crypto erlang-diameter erlang-ftp erlang-inets erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key erlang-sasl erlang-snmp erlang-ssh erlang-ssl erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl
# Install RabbitMQ
apt-get install -y rabbitmq-server
# Start service
systemctl enable rabbitmq-server
systemctl start rabbitmq-serverVerify Installation
rabbitmqctl status | head -20Enable Management Plugin
The management plugin provides a web UI on port 15672:
rabbitmq-plugins enable rabbitmq_management
# Restart RabbitMQ to apply
systemctl restart rabbitmq-serverAccess the dashboard at http://localhost:15672 with default credentials:
- Username:
guest - Password:
guest
User and Virtual Host Management
Remove Default Guest User
For production, always remove the default guest account:
rabbitmqctl delete_user guestCreate Admin User
# Create admin user
rabbitmqctl add_user admin SecurePassword123!
# Set administrator tags
rabbitmqctl set_user_tags admin administrator
# Grant all permissions on all vhosts
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"Create Application User
# Create user with limited permissions
rabbitmqctl add_user appuser AppPassword456!
# Grant permissions only on specific vhost
rabbitmqctl set_permissions -p /production appuser "^appuser-.*" ".*" ".*"Verify User Setup
# List all users
rabbitmqctl list_users
# Check user permissions
rabbitmqctl list_permissions -p /productionVirtual Hosts and Permissions
Create Virtual Host
# Create vhost for production environment
rabbitmqctl add_vhost /production
# Verify creation
rabbitmqctl list_vhostsSet Granular Permissions
# Syntax: set_permissions -p vhost username configure write read
# - configure: queue/exchange declaration
# - write: message publishing
# - read: message consumption
rabbitmqctl set_permissions -p /production appuser "^appuser-.*" "^appuser-.*" "^appuser-.*"Core Concepts
Exchanges
Direct Exchange: Route messages to queues with exact routing key match
rabbitmqctl declare_exchange -p /production orders directFanout Exchange: Broadcast messages to all bound queues
rabbitmqctl declare_exchange -p /production notifications fanoutTopic Exchange: Route based on pattern matching (e.g., user.created.*)
rabbitmqctl declare_exchange -p /production events topicQueues and Bindings
# Declare queue
rabbitmqctl declare_queue -p /production user_events
# Bind queue to exchange
rabbitmqctl bind_queue -p /production user_events events "user.*"Python AMQP Client Example
Install pika library:
pip install pikaPublisher Example
import pika
import json
# Connect to RabbitMQ
credentials = pika.PlainCredentials('appuser', 'AppPassword456!')
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', credentials=credentials, virtual_host='/production')
)
channel = connection.channel()
# Declare exchange and queue
channel.exchange_declare(exchange='events', exchange_type='topic', durable=True)
channel.queue_declare(queue='order_queue', durable=True)
channel.queue_bind(exchange='events', queue='order_queue', routing_key='order.*')
# Publish message
message = {
'order_id': '12345',
'customer': 'john@example.com',
'total': 99.99
}
channel.basic_publish(
exchange='events',
routing_key='order.created',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent delivery
content_type='application/json'
)
)
print("Message published")
connection.close()Consumer Example
import pika
import json
credentials = pika.PlainCredentials('appuser', 'AppPassword456!')
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', credentials=credentials, virtual_host='/production')
)
channel = connection.channel()
# Declare queue (idempotent)
channel.queue_declare(queue='order_queue', durable=True)
# Set QoS - prefetch one message at a time
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
try:
message = json.loads(body)
print(f"Processing order: {message['order_id']}")
# Process message
# ... your business logic ...
# Acknowledge after successful processing
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error: {e}")
# Requeue message on error
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
# Start consuming
channel.basic_consume(queue='order_queue', on_message_callback=callback)
print("Waiting for messages...")
channel.start_consuming()Durable and Persistent Messages
Ensure messages survive broker restarts:
# In publisher:
channel.basic_publish(
exchange='events',
routing_key='order.created',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # 2 = persistent, 1 = transient
content_type='application/json'
)
)
# In consumer setup:
channel.queue_declare(queue='order_queue', durable=True)Dead Letter Exchange (DLX)
Route failed messages to a separate queue:
# Declare main exchange and queue
channel.exchange_declare(exchange='events', exchange_type='topic', durable=True)
channel.queue_declare(
queue='order_queue',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx_events',
'x-dead-letter-routing-key': 'order.failed',
'x-message-ttl': 3600000 # 1 hour TTL
}
)
# Declare DLX
channel.exchange_declare(exchange='dlx_events', exchange_type='topic', durable=True)
channel.queue_declare(queue='order_dlq', durable=True)
channel.queue_bind(exchange='dlx_events', queue='order_dlq', routing_key='order.*')Reject Message to DLX
def callback(ch, method, properties, body):
try:
process_message(json.loads(body))
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Send to DLX by nacking without requeue
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)Memory and Disk Management
Configure Memory Thresholds
Edit /etc/rabbitmq/rabbitmq.conf:
# High water mark: when to pause publishers
vm_memory_high_watermark.relative = 0.6
# Low water mark: when to resume
vm_memory_high_watermark_paging_ratio = 0.75
# Disk space threshold (in MB)
disk_free_limit.absolute = 2GBRestart RabbitMQ:
systemctl restart rabbitmq-serverMonitor Memory Usage
# Check memory stats
rabbitmqctl status | grep -A 20 "memory:"
# List queue memory usage
rabbitmqctl list_queues name messages memoryRabbitMQ Clustering
Add Node to Cluster
On node 2, join cluster created by node 1:
# Stop RabbitMQ on node 2
systemctl stop rabbitmq-server
# Reset node to join cluster
rabbitmqctl reset
# Stop app but keep Erlang running
rabbitmqctl stop_app
# Join cluster (node1 is the seed node)
rabbitmqctl join_cluster rabbit@node1
# Start app
rabbitmqctl start_appVerify Cluster Status
# On any node
rabbitmqctl cluster_status
# Expected output shows all nodesIn production, use RabbitMQ with:
- Mirrored queues: For automatic failover
- Persistent storage: Configure disk space limits
- Monitoring: Use Prometheus plugin for metrics
- Network isolation: Secure inter-node communication with SSL/TLS
Nginx Reverse Proxy
Protect RabbitMQ management UI behind Nginx:
upstream rabbitmq_management {
server localhost:15672;
}
server {
listen 443 ssl http2;
server_name rabbitmq.example.com;
ssl_certificate /etc/letsencrypt/live/rabbitmq.example.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/rabbitmq.example.com/privkey.pem;
auth_basic "RabbitMQ Management";
auth_basic_user_file /etc/nginx/.htpasswd;
location / {
proxy_pass http://rabbitmq_management;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}Next Steps
- Set up Prometheus monitoring with rabbitmq_prometheus plugin
- Configure log rotation with logrotate
- Implement message TTL and expiration policies
- Design queue topology for your application
- Test failover scenarios in staging environment