Demo of some RabbitMQ features

This demo shows the following:

1) we can copy every message sent to a special audit exchange

2) messages left unprocessed on queues (after a period of time ) can be sent to a special Dead Letter Exchange (DLE)

3) messages that were sent to a Topic Exchange but which were unrouted can be sent to a special Unrouted Exchange.

RabbitMQ Setup

The demo uses RabbitMQ resources which are all predefined in this this configuration file. You will  need to have Admin Access to a RabbitMQ server to load the configuration file into RabbitMQ and perform the tasks outlined below.

The configuration file defines the following RabbitMQ resources:

Exchanges (5)

  1. sys.ex.audit - where a copy of every message sent to ex.entry.point gets sent
  2. sys.ex.unrouted - where unrouted messages get sent
  3. sys.ex.dead.letter - where unprocessed ( timed out ) messages get sent
  4. ex.entry.point - where messages enter the system for routing and processing
  5. ex.router.main - where the initial routing of messages happen.

Queues (5)

  1. sys.q.audit.01 - a queue bound to sys.ex.audit
  2. sys.q.dead.letter.01 - a queue bound to sys.ex.dead.letter
  3. sys.q.unrouted.01 - a queue bound to sys.ex.unrouted
  4. q.type1 - a queue that gets 'type1' messages from ex.router.main
  5. q.type2 - a queue that gets 'type2' messages from ex.router.main

Policies(2)

  1. unroutedMessagePolicy - applies to all Exchanges starting with 'ex.router'. If the message is unrouted by the Exchange, it will be sent to the 'alternate exchange' : sys.ex.unrouted
  2. messageTimeoutPolicy - applies to all Queues starting with 'q'. If the message is left unprocessed on the Queue after 30 seconds, it will be sent to the 'dead-lettter-exchange' : sys.ex.dead.letter

Notes about the Demo configuration

The naming of resources is important - this ensures application Queues and Exchanges are separated out from System Queues and Exchanges. For example messages on the sys.q.dead.letter.01 don't have a TTL but those on q.type1 and q.type2 do.

We've had to use 2 exchanges ex.entry.point and ex.router.main. The reason for this is that if have a single exchange from which we copy every message to audit - this will stop messages from appearing unrouted even if they only go to the audit queue.

The functionality covered by the 2 policies could be defined on each Queue/Exchange but this is easy to get wrong. Using policies reduces overhead and increases quality.

Instructions for setting up the demo and exploring its features

Assumptions : you have RabbitMQ admin username and password

Rabbit Credentials
$ export RBT_USER=<your rabbit admin user>
$ export RBT_PASSWD=<your rabbit admin password>

Create a variable to hold the new vhost name for this demo - 'vhDemo01' is the example used here.

Setup VHOST variable
$ export VHOST='vhDemo01'

Create the Virtual Host (vhost)

Create the Virtual Host
$ rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD declare vhost name=$VHOST
vhost declared

Import the rabbit config for this demo into this vhost. This assumes you have called your config file 'rabbit_config_for_demo.json'

Import Demo Rabbit Config
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST import rabbit_config_for_demo.json
Uploaded definitions from "localhost" to rabbit_config_for_demo.json. The import process may take some time. Consult server logs to track progress.


Check that 5 new Exchanges have been defined.

Note the naming convention - "system or management" exchanges used to help ensure good management of our rabbit setup - start with "sys.ex"

- other exchanges start with "ex". This naming pattern is important because of the Policies which apply extra functionality to groups of Queues/Exchanges.

Check New Exchanges in vhost
$ rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list exchanges | grep -v amq | grep -v direct
+--------------------+---------+
|        name        |  type   |
+--------------------+---------+
| ex.entry.point     | fanout  |
| ex.router.main     | topic   |
| sys.ex.audit       | fanout  |
| sys.ex.dead.letter | fanout  |
| sys.ex.unrouted    | fanout  |
+--------------------+---------+

Check that 5 new queues have been defined.

Note the naming convention - "system or management" queues used to help ensure good management of our rabbit setup - start with "sys.q"

- other exchanges start with "q.". This naming pattern is important.

Notice that the queues are all empty.

Check New Queues for VHOST
$ rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list queues
+----------------------+----------+
|         name         | messages |
+----------------------+----------+
| q.type1              | 0        |
| q.type2              | 0        |
| sys.q.audit.01       | 0        |
| sys.q.dead.letter.01 | 0        |
| sys.q.unrouted.01    | 0        |
+----------------------+----------+

Check that 7 new bindings have been defined.

The first 2 are fanout bindings from ex.entry.point to sys.ex.audit and ex.router.main

The next 2 are topic bindings from ex.router.main to q.type1 for routing key 'type1' and q.type2 for routing key 'type2'. Notice - no binding for 'type3'

The last 3 are fanout bindings for each of the 'sys' exchanges to their message queues.

Check new Bindings for vhost
$ rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list bindings | grep -v '^|  '
+--------------------+----------------------+----------------------+
+--------------------+----------------------+----------------------+
| ex.entry.point     | ex.router.main       |                      |
| ex.entry.point     | sys.ex.audit         |                      |
| ex.router.main     | q.type1              | type1                |
| ex.router.main     | q.type2              | type2                |
| sys.ex.audit       | sys.q.audit.01       |                      |
| sys.ex.dead.letter | sys.q.dead.letter.01 |                      |
| sys.ex.unrouted    | sys.q.unrouted.01    |                      |
+--------------------+----------------------+----------------------+

Check that 2 new Policies have been defined

In addition to Exchanges, Queues and Bindings - this demo relies on 2 Routing "Policies". These help provide extra routing for unprocessed and unrouted messages to groups of Exchanges/Queues.

The policy 'unroutedMessagePolicy' applies to exchanges starting with 'ex.router' - any unrouted message gets sent to 'sys.ex.unrouted'

The policy 'messageTimeoutPolicy' applies to queues starting with 'q' - any message still on a queue after 30 seconds is sent to 'sys.ex.dead.letter'

Note: because of these policies - it's important to name the queues and exchanges carefully.

This shows the 2 policies described in the configuration.

Check new Policies
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list policies | grep "^| $VHOST"
| vhDemo01 | unroutedMessagePolicy | exchanges | {"alternate-exchange": "sys.ex.unrouted"}                            | ^ex.router.* | 0        |
| vhDemo01 | messageTimeoutPolicy  | queues    | {"message-ttl": 30000, "dead-letter-exchange": "sys.ex.dead.letter"} | ^q.*         | 1        |

Now we have checked that the Demo is setup correctly. We can now send messages to RabbitMQ to explore the working demo.

First, check that the queues are empty.

Check all Queues empty at start
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list queues
+----------------------+----------+
|         name         | messages |
+----------------------+----------+
| q.type1              | 0        |
| q.type2              | 0        |
| sys.q.audit.01       | 0        |
| sys.q.dead.letter.01 | 0        |
| sys.q.unrouted.01    | 0        |
+----------------------+----------+

Sending a type1 message

If we send a message with routing key 'type1' to ex.entry.point

Send type1 message
$ rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST publish routing_key="type1" exchange="ex.entry.point" payload="type1:test message"

Check where type1 message has ended up

It should be on q.type1 and a copy on sys.q.audit.01. This demonstrates routing based on message type and auditing.

After type1 message sent
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list queues
+----------------------+----------+
|         name         | messages |
+----------------------+----------+
| q.type1              | 1        |
| q.type2              | 0        |
| sys.q.audit.01       | 1        |
| sys.q.dead.letter.01 | 0        |
| sys.q.unrouted.01    | 0        |
+----------------------+----------+

If we wait 30 seconds and check again. The message should timeout on q.type1 and be sent to sys.q.dead.letter.01. This demonstrates the messageTimeoutPolicy.

Check type1 message after 30 seconds
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list queues
+----------------------+----------+
|         name         | messages |
+----------------------+----------+
| q.type1              | 0        |
| q.type2              | 0        |
| sys.q.audit.01       | 1        |
| sys.q.dead.letter.01 | 1        |
| sys.q.unrouted.01    | 0        |
+----------------------+----------+

Sending a type2 message

If we send a message with routing key 'type2' to ex.entry.point

Send type2 message
$ rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST publish routing_key="type2" exchange="ex.entry.point" payload="type2:test message"


Check where type2 message has ended up

It should be on q.type2 and a copy on sys.q.audit.01. This demonstrates routing based on message type and auditing.

After type2 message sent
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list queues
+----------------------+----------+
|         name         | messages |
+----------------------+----------+
| q.type1              | 0        |
| q.type2              | 1        |
| sys.q.audit.01       | 2        |
| sys.q.dead.letter.01 | 1        |
| sys.q.unrouted.01    | 0        |
+----------------------+----------+

If we wait 30 seconds and check again. The message should timeout on q.type2 and be sent to sys.q.dead.letter.01. This demonstrates the messageTimeoutPolicy.

Check 30 seconds after type2
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list queues
+----------------------+----------+
|         name         | messages |
+----------------------+----------+
| q.type1              | 0        |
| q.type2              | 0        |
| sys.q.audit.01       | 2        |
| sys.q.dead.letter.01 | 2        |
| sys.q.unrouted.01    | 0        |
+----------------------+----------+

Send a type3 message

If we send a message with routing key 'type3' to ex.entry.point

Send type3 message
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST publish routing_key="type3" exchange="ex.entry.point" payload="type3:test message"

The message should end up on sys.q.audit.01 because we copy every message to this queue.

The message should end up on sys.q.unrouted.01 because when it arrived at ex.router.main there is no mapping for 'type3' messages so it gets sent to sys.ex.unrouted which forwards to sys.q.unrouted.01. This demonstrates the unroutedMessagePolicy.

After type3 message sent
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list queues
+----------------------+----------+
|         name         | messages |
+----------------------+----------+
| q.type1              | 0        |
| q.type2              | 0        |
| sys.q.audit.01       | 3        |
| sys.q.dead.letter.01 | 2        |
| sys.q.unrouted.01    | 1        |
+----------------------+----------+