This demo shows the following:
1) we can copy every message sent to a special audit exchange
2) messages left unprocessed on queues (after a period of time ) can be sent to a special Dead Letter Exchange (DLE)
3) messages that were sent to a Topic Exchange but which were unrouted can be sent to a special Unrouted Exchange.
RabbitMQ Setup
The demo uses RabbitMQ resources which are all predefined in this this configuration file. You will need to have Admin Access to a RabbitMQ server to load the configuration file into RabbitMQ and perform the tasks outlined below.
The configuration file defines the following RabbitMQ resources:
Exchanges (5)
sys.ex.audit
- where a copy of every message sent toex.entry.point
gets sentsys.ex.unrouted
- where unrouted messages get sentsys.ex.dead.letter
- where unprocessed ( timed out ) messages get sentex.entry.point
- where messages enter the system for routing and processingex.router.main
- where the initial routing of messages happen.
Queues (5)
sys.q.audit.01
- a queue bound tosys.ex.audit
sys.q.dead.letter.01
- a queue bound tosys.ex.audit
sys.q.unrouted.01
- a queue bound tosys.ex.audit
q.type1
- a queue that gets 'type1' messages fromex.router.main
q.type2
- a queue that gets 'type2' messages fromex.router.main
Policies(2)
- unroutedMessagePolicy - applies to all Exchanges starting with 'ex.router'. If the message is unouted by the Exchange, it will be sent to the 'alternate exchange' :
sys.ex.unrouted
- messageTimeoutPolicy - applies to all Queues starting with 'q'. If the message is left unprocessed on the Queue after 30 seconds, it will be sent to the 'dead-lettter-exchange' :
sys.ex.dead.letter
.
Notes about the Demo configuration
The naming of resources is important - this ensures application Queues and Exchanges are separated out from System Queues and Exchanges. For example messages on the sys.q.dead.letter.01
don't have a TTL but those on q.type1
and q.type2
do.
We've had to use 2 exchanges ex.entry.point
and ex.router.main
. The reason for this is that if have a single exchange from which we copy every message to audit - this will stop messages from appearing unrouted even if they only go to the audit queue.
The functionality covered by the 2 policies could be defined on each Queue but this is easy to get wrong. Using policies reduces overhead and increases quality.
Instructions for setting up the demo and exploring its features
Assumptions : you have RabbitMQ admin username and password
$ export RBT_USER=<your rabbit admin user> $ export RBT_PASSWD=<your rabbit admin password>
Create a variable to hold the new vhost name for this demo - 'vhDemo01' is the example used here.
$ export VHOST='vhDemo01'
Create the Virtual Host (vhost)
$ rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD declare vhost name=$VHOST vhost declared
Import the rabbit config for this demo into this vhost. This assumes you have called your config file 'rabbit_config_for_demo.json'
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST import rabbit_config_for_demo.json Uploaded definitions from "localhost" to rabbit_config_for_demo.json. The import process may take some time. Consult server logs to track progress.
Check that 5 new Exchanges have been defined.
Note the naming convention - "system or management" exchanges used to help ensure good management of our rabbit setup - start with "sys.ex"
- other exchanges start with "ex". This naming pattern is important.
$ rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list exchanges | grep -v amq | grep -v direct +--------------------+---------+ | name | type | +--------------------+---------+ | ex.entry.point | fanout | | ex.router.main | topic | | sys.ex.audit | fanout | | sys.ex.dead.letter | fanout | | sys.ex.unrouted | fanout | +--------------------+---------+
Check that 5 new queues have been defined.
Note the naming convention - "system or management" queues used to help ensure good management of our rabbit setup - start with "sys.q"
- other exchanges start with "q.". This naming pattern is important.
Notice that the queues are all empty.
$ rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list queues +----------------------+----------+ | name | messages | +----------------------+----------+ | q.type1 | 0 | | q.type2 | 0 | | sys.q.audit.01 | 0 | | sys.q.dead.letter.01 | 0 | | sys.q.unrouted.01 | 0 | +----------------------+----------+
Check that 7 new bindings have been defined.
The first 2 are fanout bindings from ex.entry.poin
t to sys.ex.audit
and ex.router.main
The next 2 are topic bindings from ex.router.main
to q.type1
for routing key 'type1' and q.type2
for routing key 'type2'
The last 3 are fanout bindings for each of the 'sys' exchanges to their queues.
davidhay@localhost RABBITMQADMIN-DEMO (master) $ rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list bindings | grep -v '^| ' +--------------------+----------------------+----------------------+ +--------------------+----------------------+----------------------+ | ex.entry.point | ex.router.main | | | ex.entry.point | sys.ex.audit | | | ex.router.main | q.type1 | type1 | | ex.router.main | q.type2 | type2 | | sys.ex.audit | sys.q.audit.01 | | | sys.ex.dead.letter | sys.q.dead.letter.01 | | | sys.ex.unrouted | sys.q.unrouted.01 | | +--------------------+----------------------+----------------------+
Check that 2 new Policies have been defined
In addition to Exchanges, Queues and Bindings - this demo relies on 2 Routing "Policies". These help provide extra routing for unprocessed and unrouted messages.
The policy 'unroutedMessagePolicy' applies to exchanges starting with 'ex.router' - any unrouted message gets sent to 'sys.ex.unrouted'
The policy 'messageTimeoutPolicy' applies to queues starting with 'q' - any message still on a queue after 30seconds is sent to 'sys.ex.dead.letter'
Note : because of these policies - it's important to name the queues and exchanges carefully.
This shows the 2 policies described above.
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list policies | grep "^| $VHOST" | vhDemo01 | unroutedMessagePolicy | exchanges | {"alternate-exchange": "sys.ex.unrouted"} | ^ex.router.* | 0 | | vhDemo01 | messageTimeoutPolicy | queues | {"message-ttl": 30000, "dead-letter-exchange": "sys.ex.dead.letter"} | ^q.* | 1 |
Now we have checked that the Demo is setup correctly. We can now send messages to RabbitMQ to explore the working demo.
First, check that the queues are empty.
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list queues +----------------------+----------+ | name | messages | +----------------------+----------+ | q.type1 | 0 | | q.type2 | 0 | | sys.q.audit.01 | 0 | | sys.q.dead.letter.01 | 0 | | sys.q.unrouted.01 | 0 | +----------------------+----------+
Sending a type1 message
If we send a message with routing key 'type1' to ex.entry.point
$ rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST publish routing_key="type1" exchange="ex.entry.point" payload="type1:test message"
Check where type1 message has ended up
It should be on q.type1
and a copy of sys.q.audit.01
. This demonstrates routing based on message type and auditing.
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list queues +----------------------+----------+ | name | messages | +----------------------+----------+ | q.type1 | 1 | | q.type2 | 0 | | sys.q.audit.01 | 1 | | sys.q.dead.letter.01 | 0 | | sys.q.unrouted.01 | 0 | +----------------------+----------+
If we wait 30 seconds and check again. The message should timeout on q.type1
and be sent to sys.q.dead.letter.01.
This demonstrates the messageTimeoutPolicy.
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list queues +----------------------+----------+ | name | messages | +----------------------+----------+ | q.type1 | 0 | | q.type2 | 0 | | sys.q.audit.01 | 1 | | sys.q.dead.letter.01 | 1 | | sys.q.unrouted.01 | 0 | +----------------------+----------+
Sending a type2 message
If we send a message with routing key 'type2' to ex.entry.point
$ rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST publish routing_key="type2" exchange="ex.entry.point" payload="type2:test message"
Check where type2 message has ended up
It should be on q.type2
and a copy of sys.q.audit.01
. This demonstrates routing based on message type and auditing.
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list queues +----------------------+----------+ | name | messages | +----------------------+----------+ | q.type1 | 0 | | q.type2 | 1 | | sys.q.audit.01 | 2 | | sys.q.dead.letter.01 | 1 | | sys.q.unrouted.01 | 0 | +----------------------+----------+
If we wait 30 seconds and check again. The message should timeout on q.type2
and be sent to sys.q.dead.letter.01.
This demonstrates the messageTimeoutPolicy.
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list queues +----------------------+----------+ | name | messages | +----------------------+----------+ | q.type1 | 0 | | q.type2 | 0 | | sys.q.audit.01 | 2 | | sys.q.dead.letter.01 | 2 | | sys.q.unrouted.01 | 0 | +----------------------+----------+
Send a type3 message
If we send a message with routing key 'type3' to ex.entry.point
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST publish routing_key="type3" exchange="ex.entry.point" payload="type3:test message"
The message should end up on sys.q.audit.01
because we copy every message to this queue.
The message should end up on sys.q.unrouted.01
because when it arrived at ex.router.main
there is no mapping for 'type3' messages so it gets sent to sys.ex.unrouted
which forwards to sys.q.unrouted.01.
This demonstrates the unroutedMessagePolicy.
rabbitmqadmin --user=$RBT_USER --password=$RBT_PASSWD --vhost=$VHOST list queues +----------------------+----------+ | name | messages | +----------------------+----------+ | q.type1 | 0 | | q.type2 | 0 | | sys.q.audit.01 | 3 | | sys.q.dead.letter.01 | 2 | | sys.q.unrouted.01 | 1 | +----------------------+----------+
Add Comment