Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Sending and Receiving Messages

Messages have a payload & properties. The properties including type, content-type, correlation id, routing key and support for additional custom headers.

Messages are always read from Queues.

Messages are written to Exchanges where they are routed to Queues. You can have several Queues bound to an Exchange. There are several types of exchanges:

  • Direct - will forward messages to bound queues if the routing key matches the queue name. The routing key pattern on the binding is not required.
  • Fanout - will forward all messages to bound queues. The routing key pattern on the binding is not required.
  • Topic - used for pub/sub - will forward to queues where the message's routing key matches the routing key pattern on the binding between the Queue and the Topic Exchange.
  • HeaderA headers exchange routes messages based on arguments containing headers and optional values. Headers exchanges are very similar to topic exchanges, but route messages based on header values instead of routing keys. 

Exchanges can be bound to Queues or other Exchanges.
Exchanges and Queues are linked via Bindings - sometimes the Binding uses a Routing Key Pattern. The Message's Routing Key can be a single word. More generally, it can be several words delimited by dots. The Routing Key pattern supports * for any 1 word and also # for any number of words.

Direct Messaging

To send a message to a specific Queue - you send the message to the 'default exchange' using the Queue Name as the 'routing key'. There is no routing key pattern required on the binding between the Direct Exchange and the Queues it's bound too.

This is not flexible because it ties the producer to a fixed, single consumer.

Pub/Sub Messaging

Publish/Subscribe decouples a single producer from many (potential) receivers.

To implement pub/sub using RabbitMQ you need to define a Topic Exchange. You can bind many queues to a Topic Exchange. If the routing key associated with the Message matches the routing key pattern that links the Topic Exchange with the Queue - then the message will be delivered to the Queue. Note : The message sent to a Topic Exchange will be delivered to every attached Queue that has a matching binding.

Spring Boot/RabbitMQ Spike


Experimenting with the integration of RabbitMQ and Spring - I was able to get the following working:

Using RabbitMQ using docker image and docker compose.

The image used was 'rabbitmq:3-management' which includes the rabbit management plugin.

Typically - RabbitMQ is on 5672 - the web based management UI is on port 15672. You can set the username/password using environmental variables.

I was able to map the storage and logs directories to my local machine.

When connecting to rabbit from spring - the following properties are used

spring.rabbitmq.host=localhost

spring.rabbitmq.port=5672

spring.rabbitmq.username=<username>

spring.rabbitmq.password=<password>


Configuring Queues/Exchanges in RabbitMQ

You can create Queues/Exchanges via the admin web interface. This is okay for experiments.
You can create Queues/Exchanges via Spring using the org.springframework.amqp.core.AmqpAdmin class. This is okay for experiments BUT you probably don't want services creating their own queues. This probably relies on the rabbit user having 'admin' privileges too.
You can create Queues/Exchanges by using the rabittctl command within the docker container running rabbit.
You can export Rabbit Broker Definitions json file from the web interface and import one too. Looks like there is another command line tool called rabbitmqadmin which can be used to export/import configs. This is probably what we want to have configuration as code.

Sending messages to Queues using Direct Exchanges

Rabbit Payloads are an array of bytes. So even for String based messaging - without a converter, you have to convert a String to/from bytes.
It was straightforward to send messages to Queues using org.springframework.amqp.rabbit.core.RabbitTemplate
It was straightforward to recv messages from Queues using the annotation : org.springframework.amqp.rabbit.annotation.RabbitListener

Pub/Sub using a single Topic Exchange

I was able to get pub/sub working with 1 topic exchange called 'mainPubSubEx' with 6 queues bound to it as follows:

When a message with routing key of 'type1' comes in - it will be delivered to 4 queues : typeAnyQueue1, type1Queue1, type1Queue2, type1Queue3

When a message with routing key of 'type2' comes in - it will be delivered to 3 queues : typeAnyQueue1, type2Queue1, type2Queue2

When a message with routing key of 'type3' comes in - it will be delivered to 1 queue : typeAnyQueue1

The typeAnyQueue1 - gets every message.

The 3 queues bound with 'type1' only get 'type1' messages. 

The 2 queues bound with 'type2' only get 'type2' messages.

PreSend/Post Receive Message Post Processors

It's possible to add pre-send and post-recv Post Processors. These can be used to reduce boiler plate by, for example, adding 'standard' properties to messages - such as the name of the micro service sending the message. I've been able to get the pre-send post-processor working but not the post-receive post-processor.Mapping JSON messages to Java Objects

If you define a Spring bean which is an instance of org.springframework.amqp.support.converter.Jackson2JsonMessageConverter. Then conversion between JSON message payloads and Java objects is done automatically by Spring. 
For sending use RabbitTemplate.sendAndConvert
For Receiving use the '@MessageListener' annotation to specify which queue(s) you want to receive from and just the the Java type in the method.
For example to read json messages from a Queue that are automatically converted into Customer instances:


@RabbitListener(queues = RabbitInfo.QUEUE_CUSTOMERS)
public void receiveCustomer(Customer customer, Message msg) {

This works well when you only want to receive 1 type of message from a Queue. The following does not work:

  1. simple having multiple methods annotated with '@MessageListener' trying to read different objects from the same queue. Extra setup is required for this.
  2. using a 'parent type' in the hope that Spring will be able to deliver subclass instances related to different messages.

Validate Received Messages

As we want to move to JSON messages as a portable serialisation format, It makes sense to use JSON validation to validate the messages are valid with respect to a JSON Schema.
If we have JSON schemas defined for each of our messages. It makes sense that we validate then when they're received and only process valid messages.

Here are the 2 approaches that were considered:
1) validate the message using a JSON schema library. If invalid - reject & log errors. If valid - convert to Java object and process.

I looked at the everit schema validator. Using this validator I was able to validate using JSON schema files that refer to other JSON schema files. This approach would mean more boiler plate code to validate messages whenever they are received. So this approach was not pursued.

2) Use a conversion tool to convert JSON schema to JSR303 (Standard Java Validation) annotated POJOS. Then we can use '@Valid' to validate messages when we receive them. This does work, it keeps the code tidy. The downside here is that JSON Schema is more sophisticated that JSR303 annotations. The tool to convert JSON Schema Json Schema 2 pojo can be used online, as a command line tool and even as a Maven plugin.

I was able to generate POJOS for the Reconciliation and Position messages defined on this page Message Format and Examples. Using these 2 messages - the JSR303 annotations did a good job of reflecting the  JSON schema.

TODO

Create non-admin users for sending and receiving messages but without create queue/exchange privilege.

Be able to startup rabbit to a known configuration of Queues/Exchanges.

Be able to run a single command to purge all queues. This might be handy for integration testing.

Be able to have Spring read different types of messages from the same queue and automatically bind these messages to Java objects and validate them.

Look at why the Post Receive Post Processor is not working

Look at improving validation error messages.

Configure Spring so that it knows how to de-serialize a message based on the 'type'/'_typeid__' property. This is stopping us from reading different messages from a single queue.

Investigate how we might test Rabbit/Spring integration. To test sent messages and invoke receive endpoints.

At some point, we may be able to create a Spring/Rabbit support library.

DeadLetterQueues / TTL / Durability

QUESTIONS

Is a simple Pub/Sub configuration using 1 Topic Exchange enough?

Do we need a more sophisticated routing key which is more than just the 'message type'? Can we incorporate version ?

Is the JSON Schema → JSR303 POJO enough validation or do we need to properly validate against JSON Schema?

Should we validate messages before we send them?

Is there anything else I've missed?

RESOURCES

RabbitMQ Exchanges, routing keys and bindings

Spring RabbitMQ

RabbitMQ Getting Started

JSON Schema To POJO

JSON Schema Validator in Java

RabbitMQ Management

RabbitMQ Docker Image



  • No labels