Software & Configuration

RabbitMQ Message Broker Setup

Deploy and configure RabbitMQ for reliable message queue handling

What is RabbitMQ?

RabbitMQ is an open-source message broker that implements the Advanced Message Queuing Protocol (AMQP). Key features:

  • Message queuing: Decouple application components with asynchronous messaging
  • Routing flexibility: Direct, fanout, topic, and headers exchange types
  • Reliability: Message persistence and acknowledgment mechanisms
  • High availability: Clustering and mirroring for fault tolerance
  • Management UI: Built-in web dashboard for monitoring and administration
  • Flexible delivery: Supports multiple consumer patterns and priority queues

Installation on Ubuntu/Debian

Add RabbitMQ Repository

# Install dependencies
apt-get install curl gnupg apt-transport-https

# Add RabbitMQ GPG key
curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/3.13.0/rabbitmq-release-signing-key.asc | apt-key add -

# Add RabbitMQ repository
echo "deb https://ppa.launchpadcontent.net/rabbitmq/rabbitmq-erlang/ubuntu $(lsb_release -sc) main" | tee /etc/apt/sources.list.d/rabbitmq.list
echo "deb https://ppa.launchpadcontent.net/rabbitmq/rabbitmq-erlang-backports/ubuntu $(lsb_release -sc) main" | tee -a /etc/apt/sources.list.d/rabbitmq.list
echo "deb https://rabbitmq.com/debian $(lsb_release -sc) main" | tee /etc/apt/sources.list.d/rabbitmq-team.list

apt-get update

Install RabbitMQ and Erlang

# Install Erlang (required runtime)
apt-get install -y erlang-base erlang-asn1 erlang-crypto erlang-diameter erlang-ftp erlang-inets erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key erlang-sasl erlang-snmp erlang-ssh erlang-ssl erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl

# Install RabbitMQ
apt-get install -y rabbitmq-server

# Start service
systemctl enable rabbitmq-server
systemctl start rabbitmq-server

Verify Installation

rabbitmqctl status | head -20

Enable Management Plugin

The management plugin provides a web UI on port 15672:

rabbitmq-plugins enable rabbitmq_management

# Restart RabbitMQ to apply
systemctl restart rabbitmq-server

Access the dashboard at http://localhost:15672 with default credentials:

  • Username: guest
  • Password: guest

User and Virtual Host Management

Remove Default Guest User

For production, always remove the default guest account:

rabbitmqctl delete_user guest

Create Admin User

# Create admin user
rabbitmqctl add_user admin SecurePassword123!

# Set administrator tags
rabbitmqctl set_user_tags admin administrator

# Grant all permissions on all vhosts
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

Create Application User

# Create user with limited permissions
rabbitmqctl add_user appuser AppPassword456!

# Grant permissions only on specific vhost
rabbitmqctl set_permissions -p /production appuser "^appuser-.*" ".*" ".*"

Verify User Setup

# List all users
rabbitmqctl list_users

# Check user permissions
rabbitmqctl list_permissions -p /production

Virtual Hosts and Permissions

Create Virtual Host

# Create vhost for production environment
rabbitmqctl add_vhost /production

# Verify creation
rabbitmqctl list_vhosts

Set Granular Permissions

# Syntax: set_permissions -p vhost username configure write read
# - configure: queue/exchange declaration
# - write: message publishing
# - read: message consumption

rabbitmqctl set_permissions -p /production appuser "^appuser-.*" "^appuser-.*" "^appuser-.*"

Core Concepts

Exchanges

Direct Exchange: Route messages to queues with exact routing key match

rabbitmqctl declare_exchange -p /production orders direct

Fanout Exchange: Broadcast messages to all bound queues

rabbitmqctl declare_exchange -p /production notifications fanout

Topic Exchange: Route based on pattern matching (e.g., user.created.*)

rabbitmqctl declare_exchange -p /production events topic

Queues and Bindings

# Declare queue
rabbitmqctl declare_queue -p /production user_events

# Bind queue to exchange
rabbitmqctl bind_queue -p /production user_events events "user.*"

Python AMQP Client Example

Install pika library:

pip install pika

Publisher Example

import pika
import json

# Connect to RabbitMQ
credentials = pika.PlainCredentials('appuser', 'AppPassword456!')
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost', credentials=credentials, virtual_host='/production')
)
channel = connection.channel()

# Declare exchange and queue
channel.exchange_declare(exchange='events', exchange_type='topic', durable=True)
channel.queue_declare(queue='order_queue', durable=True)
channel.queue_bind(exchange='events', queue='order_queue', routing_key='order.*')

# Publish message
message = {
    'order_id': '12345',
    'customer': 'john@example.com',
    'total': 99.99
}

channel.basic_publish(
    exchange='events',
    routing_key='order.created',
    body=json.dumps(message),
    properties=pika.BasicProperties(
        delivery_mode=2,  # Persistent delivery
        content_type='application/json'
    )
)

print("Message published")
connection.close()

Consumer Example

import pika
import json

credentials = pika.PlainCredentials('appuser', 'AppPassword456!')
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost', credentials=credentials, virtual_host='/production')
)
channel = connection.channel()

# Declare queue (idempotent)
channel.queue_declare(queue='order_queue', durable=True)

# Set QoS - prefetch one message at a time
channel.basic_qos(prefetch_count=1)

def callback(ch, method, properties, body):
    try:
        message = json.loads(body)
        print(f"Processing order: {message['order_id']}")

        # Process message
        # ... your business logic ...

        # Acknowledge after successful processing
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Error: {e}")
        # Requeue message on error
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

# Start consuming
channel.basic_consume(queue='order_queue', on_message_callback=callback)

print("Waiting for messages...")
channel.start_consuming()

Durable and Persistent Messages

Ensure messages survive broker restarts:

# In publisher:
channel.basic_publish(
    exchange='events',
    routing_key='order.created',
    body=json.dumps(message),
    properties=pika.BasicProperties(
        delivery_mode=2,  # 2 = persistent, 1 = transient
        content_type='application/json'
    )
)

# In consumer setup:
channel.queue_declare(queue='order_queue', durable=True)

Dead Letter Exchange (DLX)

Route failed messages to a separate queue:

# Declare main exchange and queue
channel.exchange_declare(exchange='events', exchange_type='topic', durable=True)
channel.queue_declare(
    queue='order_queue',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx_events',
        'x-dead-letter-routing-key': 'order.failed',
        'x-message-ttl': 3600000  # 1 hour TTL
    }
)

# Declare DLX
channel.exchange_declare(exchange='dlx_events', exchange_type='topic', durable=True)
channel.queue_declare(queue='order_dlq', durable=True)
channel.queue_bind(exchange='dlx_events', queue='order_dlq', routing_key='order.*')

Reject Message to DLX

def callback(ch, method, properties, body):
    try:
        process_message(json.loads(body))
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        # Send to DLX by nacking without requeue
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

Memory and Disk Management

Configure Memory Thresholds

Edit /etc/rabbitmq/rabbitmq.conf:

# High water mark: when to pause publishers
vm_memory_high_watermark.relative = 0.6

# Low water mark: when to resume
vm_memory_high_watermark_paging_ratio = 0.75

# Disk space threshold (in MB)
disk_free_limit.absolute = 2GB

Restart RabbitMQ:

systemctl restart rabbitmq-server

Monitor Memory Usage

# Check memory stats
rabbitmqctl status | grep -A 20 "memory:"

# List queue memory usage
rabbitmqctl list_queues name messages memory

RabbitMQ Clustering

Add Node to Cluster

On node 2, join cluster created by node 1:

# Stop RabbitMQ on node 2
systemctl stop rabbitmq-server

# Reset node to join cluster
rabbitmqctl reset

# Stop app but keep Erlang running
rabbitmqctl stop_app

# Join cluster (node1 is the seed node)
rabbitmqctl join_cluster rabbit@node1

# Start app
rabbitmqctl start_app

Verify Cluster Status

# On any node
rabbitmqctl cluster_status

# Expected output shows all nodes

In production, use RabbitMQ with:

  • Mirrored queues: For automatic failover
  • Persistent storage: Configure disk space limits
  • Monitoring: Use Prometheus plugin for metrics
  • Network isolation: Secure inter-node communication with SSL/TLS

Nginx Reverse Proxy

Protect RabbitMQ management UI behind Nginx:

upstream rabbitmq_management {
    server localhost:15672;
}

server {
    listen 443 ssl http2;
    server_name rabbitmq.example.com;

    ssl_certificate /etc/letsencrypt/live/rabbitmq.example.com/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/rabbitmq.example.com/privkey.pem;

    auth_basic "RabbitMQ Management";
    auth_basic_user_file /etc/nginx/.htpasswd;

    location / {
        proxy_pass http://rabbitmq_management;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }
}

Next Steps

  • Set up Prometheus monitoring with rabbitmq_prometheus plugin
  • Configure log rotation with logrotate
  • Implement message TTL and expiration policies
  • Design queue topology for your application
  • Test failover scenarios in staging environment

On this page