Event Based Messaging with RabbitMQ
Sending and Receiving Messages
Messages have a payload & properties. The properties including type, content-type, correlation id, routing key and support for additional custom headers.
Messages are always read from Queues.
Messages are written to Exchanges where they are routed to Queues. You can have several Queues bound to an Exchange. There are several types of exchanges:
- Direct - will forward messages to bound queues if the message routing key matches the routing key in the binding.
- Default - The Default Exchange is a special Direct Exchange which doesn't have any explicit bindings. The message is sent to the Queue whose name matches the routing key. The Default Exchange has an empty name. The Default Exchange cannot be deleted.
- Fanout - will forward all messages to bound queues. The routing key pattern on the binding is not required.
- Topic - used for publish/subscribe - will forward to downstream queues/exchanges where the message's routing key matches the routing key pattern on the binding between the Topic Exchange and the bound, downstream Queues/Exchanges.
- Header - A headers exchange routes messages based on arguments containing headers and optional values. Headers exchanges are very similar to topic exchanges, but route messages based on header values instead of routing keys. Headers Exchange can match on any or all of a list of header/value pairs. In addition to custom headers, Header Exchanges can route based the following RabbitMQ message properties too.
- content_type
- content_encoding
- priority
- correlation_id
- reply_to
- expiration
- message_id
- timestamp
- type
- user_id
- app_id
- cluster_id
Exchanges can be bound to Queues or other Exchanges.
Exchanges and Queues are linked via Bindings - sometimes the Binding uses a Routing Key Pattern. The Message's Routing Key can be a single word. More generally, it can be several words delimited by dots. The Routing Key pattern supports * for any 1 word and also # for any number of words.
When you configure an Exchange - you can specify an Alternative Exchange where messages sent to this exchange but not routed by this exchange (to another Exchange or Queue) are sent. This is different but related to Dead Letter Exchanges where messages that are rejected by Queue consumers (or timed out on Queue, or rejected because of full Queue ) are sent
Direct Messaging
To send a message to a specific Queue - you send the message to the 'Default Exchange' using the Queue Name as the 'routing key'. There is an implicit binding between the Default Exchange and every Queue.
This is not flexible because it ties the producer to a fixed, single consumer.
Publish/Subscribe (pub/sub) Messaging
Publish/Subscribe decouples a single producer from many (potential) receivers.
To implement pub/sub using RabbitMQ you need to define a Topic Exchange. You can bind many queues to a Topic Exchange. If the routing key associated with the Message matches the routing key pattern that links the Topic Exchange with the Queue - then the message will be delivered to the Queue. Note : The message sent to a Topic Exchange will be delivered to every attached Queue that has a matching binding.
Spring Boot/RabbitMQ Spike
Experimenting with the integration of RabbitMQ and Spring - I was able to get the following working:
Using RabbitMQ using docker image and docker compose.
The image used was 'rabbitmq:3-management' which includes the rabbit management plugin.
Typically - RabbitMQ is on 5672 - the web based management UI is on port 15672. You can set the username/password using environmental variables.
I was able to map the storage and logs directories to my local machine.
When connecting to rabbit from spring - the following properties are used
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=<username> spring.rabbitmq.password=<password> spring.rabbitmq.virtual-host=/
Configuring Queues/Exchanges in RabbitMQ
You can create Queues/Exchanges via the admin web interface. This is okay for experiments.
You can create Queues/Exchanges via Spring using the org.springframework.amqp.core.AmqpAdmin class. This is okay for experiments BUT you probably don't want services creating their own queues. This probably relies on the RabbitMQ user having 'admin' privileges too.
You can create Queues/Exchanges by using the rabbitmqadmin command locally and point it to the RabbitMQ server using --host and --port parameters.
You can create Queues/Exchanges by using the rabbitmqadmin command within the docker container running rabbit.
You can export Rabbit Broker Definitions json file from the web interface and import one too. The command line tool rabbitmqadmin can be used to export/import configuration. This is probably what we want to have configuration as code.
Sending messages to Queues using Direct Exchanges
Rabbit Payloads are an array of bytes. Without a converter, you have to convert messages to/from bytes. Spring's RabbitTemplate performs the conversion for String payloads.
It was straightforward to send messages to Queues using org.springframework.amqp.rabbit.core.RabbitTemplate
It was straightforward to recv messages from Queues using the annotation : org.springframework.amqp.rabbit.annotation.RabbitListener
Pub/Sub using a single Topic Exchange
I was able to get pub/sub working with 1 topic exchange called 'mainPubSubEx' with 6 queues bound to it as follows:
When a message with routing key of 'type1' comes in - it will be delivered to 4 queues : typeAnyQueue1, type1Queue1, type1Queue2, type1Queue3
When a message with routing key of 'type2' comes in - it will be delivered to 3 queues : typeAnyQueue1, type2Queue1, type2Queue2
When a message with routing key of 'type3' comes in - it will be delivered to 1 queue : typeAnyQueue1
The typeAnyQueue1 - gets every message.
The 3 queues bound with 'type1' only get 'type1' messages.
The 2 queues bound with 'type2' only get 'type2' messages.
PreSend/Post Receive Message Post Processors
It's possible to add pre-send and post-receive Post Processors. These can be used to reduce boiler plate by, for example, adding 'standard' properties to messages - such as the name of the micro service sending the message. I've been able to get the pre-send post-processor working but not the post-receive post-processor.
Mapping JSON messages to Java Objects
If you define a Spring bean which is an instance of org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
. Then conversion between JSON message payloads and Java objects is done automatically by Spring.
For sending use RabbitTemplate.sendAndConvert
For Receiving use the '@RabbitListener
' annotation to specify which queue(s) you want to receive from and just the the Java type in the method.
For example to read JSON messages from a Queue that are automatically converted into Customer instances:
@RabbitListener(queues = RabbitInfo.QUEUE_CUSTOMERS)
public void receiveCustomer(Customer customer, Message msg) {
This works well when you only want to receive 1 type of message from a Queue. The following does not work:
- simple having multiple methods annotated with '
@RabbitListener
' trying to read different objects from the same queue. Extra setup is required for this. - using a 'parent type' in the hope that Spring will be able to deliver subclass instances related to different messages.
Validate Received Messages
As we want to move to JSON messages as a portable serialisation format, It makes sense to use JSON validation to validate the messages are valid with respect to a JSON Schema.
If we have JSON schemas defined for each of our messages. It makes sense that we validate then when they're received and only process valid messages.
Here are the 2 approaches that were considered:
1) validate the message using a JSON schema library. If invalid - reject & log errors. If valid - convert to Java object and process.
I looked at the everit schema validator. Using this validator I was able to validate using JSON schema files that refer to other JSON schema files. This approach would mean more boiler plate code to validate messages whenever they are received. So this approach was not pursued.
2) Use a conversion tool to convert JSON schema to JSR303 (Standard Java Validation) annotated POJOS. Then we can use @Valid
to validate messages when we receive them. This does work, it keeps the code tidy. The downside here is that JSON Schema is more sophisticated that JSR303 annotations. The tool to convert JSON Schema Json Schema 2 pojo can be used online, as a command line tool and even as a Maven plugin.
I was able to generate POJOS for the Reconciliation and Position messages defined on this page Message Format and Examples. Using these 2 messages - the JSR303 annotations did a good job of reflecting the JSON schema.
TODO
Create non-admin users for sending and receiving messages but without create queue/exchange privilege.
Be able to startup rabbit to a known configuration of Queues/Exchanges. see Configuring RabbitMQ Resources
Be able to run a single command to purge all queues. This might be handy for integration testing. see Configuring RabbitMQ Resources
Be able to have Spring read different types of messages from the same queue and automatically bind these messages to Java objects and validate them.
Look at why the Post Receive Post Processor is not working
Look at improving validation error messages.
Configure Spring so that it knows how to de-serialize a message based on the 'type'/'_typeid__' property. This is stopping us from reading different messages from a single queue.
Investigate how we might test Rabbit/Spring integration. To test sent messages and invoke receive endpoints. - Done with Test Containers- needs documenting.
At some point, we may be able to create a Spring/Rabbit support library. - Done in https://github.com/Health-Education-England/TIS-rabbit-mongo-spring-boot-starter
DeadLetterQueues / TTL / Durability - Done a demo with DLQ and TTL - needs documenting.
QUESTIONS
Is a simple Pub/Sub configuration using 1 Topic Exchange enough?
Do we need a more sophisticated routing key which is more than just the 'message type'? Can we incorporate version ?
Is the JSON Schema → JSR303 POJO enough validation or do we need to properly validate against JSON Schema?
Should we validate messages before we send them?
Is there anything else I've missed?
RESOURCES
RabbitMQ Exchanges, routing keys and bindings
Slack: https://hee-nhs-tis.slack.com/
Jira issues: https://hee-tis.atlassian.net/issues/?filter=14213