ESR Rejected RabbitMQ message processing

The TIS/ESR Microservices are implemented in Java and (typically) do work by processing inbound RabbitMQ messages.

Current Situation

At the moment, there is limited error handing support within these Microservices. If an error occurs and exception will be thrown and 1 of 4 things will happen:

if the queue does NOT have an associated dead-letter exchange

  1. If the exception is an instance of AmqpRejectAndDontRequeueException the message will be dropped.

  2. otherwise the message will be returned to the source queue for repeated processing. This can lead to an infinite loop.

if the queue does have an associated dead-letter exchange

  1. If the exception is an instance of AmqpRejectAndDontRequeueException the message will be sent to the dead-letter exchange.

  2. otherwise the message will be returned to the source queue for repeated processing. This can lead to an infinite loop.

In the case where the message is rejected to the dead-letter exchange, it does not carry details of the exception that caused the rejection.

The current situation does not support retries. For problems like OptimisticLockingFailureException - we would like to retry a number of times as these errors should be temporary.

If during the processing of one message we fail AFTER sending a further message, the sent messages cannot be un-sent.

Proposed Solution

The proposed solution adds the following capabilities:

  1. Retries. The ability for messages to be retried. The number of retries and the delay between retries is configurable.

  2. Error details. When a message is rejected, error details will be added to the headers of rejected messages.

  3. Transactions. Rabbit messages will be processed within Transactions. This means that messages โ€˜sentโ€™ during the transaction will only be actually sent if the processing completes successfully.

  4. Optional Schema Validation. The proposed solution allows optional schema validation to reject invalid messages before they get passed to the main message processing code.

The following github project holds the source code of a Proof of Concept (PoC) used to develop these capabilities.

https://github.com/Health-Education-England/TIS-EsrRabbitErrorHandlingPoc

Retries and Error Details

In order to add these capabilities we have to write some custom error handling logic in addition to adding some new Rabbit exchanges/queues and routes. Here is a diagram showing the Rabbit Configuration used by the PoC.

ย 

ย 

The โ€˜ex.dead-letterโ€™ and โ€˜q.dead-letterโ€™ take the place of the โ€˜main.dlx' and 'esr.dlq.allโ€™ in the current ESR Rabbit config. They represent a dead-letter exchange and a dead-letter queue attached to the dead-letter exchange.

The โ€˜ex.errorโ€™, โ€˜ex.error.delayโ€™ and โ€˜q.error.delayโ€™ are new Rabbit resources required to support retries.

The โ€˜ex.main' takes the place of the 'main.exchange' in the current ESR Rabbit Config.

The โ€˜q.type.1โ€™ is used in the POC to represent a typical Rabbit queue which stores messages to be received and processed by a TIS/ESR Microservice.

The CustomErrorRecover intercepts exceptions from the processing of a Rabbit message. The CustomErrorRecover adds the following headers to the Rabbit message before forwarding it to the โ€˜ex.error' exchange. The https://github.com/Health-Education-England/TIS-EsrRabbitErrorHandlingPoc/blob/master/src/main/java/com/hee/tis/esr/rabbiterrorpoc/errorsupport/CustomMessageRecoverer.java is code written for the PoC - it extends the framework class spring-amqp/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecoverer.java at main ยท spring-projects/spring-amqp

Header

Description

Header

Description

z-attempts

number of failed attempts. If z-attempts header does not exist, itโ€™s set to 1. If it does exist, itโ€™s incremented by 1.

z-do-requeue

set to โ€˜false' if the failure was caused by โ€˜AmqpRejectAndDontRequeueExceptionโ€™, โ€˜trueโ€™ otherwise.

z-exception-type

The underlying type of exception which caused this failure.

z-validation-errors

This is a list of validation errors if the underlying exception type was 'org.everit.json.schema.ValidationException'

x-exception-stacktrace

This holds the actual Java stacktrace of the exception which caused the failure.

x-exception-message

The message of the exception which caused the failure

x-original-exchange

The exchange of the message before the error occurred

x-original-routingKey

The routing key of the message before the error occurred.

Typically, each @MessageListener annotation specifies the name of a specific โ€˜containerFactoryโ€™ Spring Bean which customises the link between TIS/ESR code and RabbitMQ. In the PoC, the โ€˜containerFactoryโ€™ used is created by the class โ€˜https://github.com/Health-Education-England/TIS-EsrRabbitErrorHandlingPoc/blob/master/src/main/java/com/hee/tis/esr/rabbiterrorpoc/errorsupport/ContainerFactoryFactory.java'. This can create โ€˜Container Factoryโ€™ spring beans which have the 'CustomErrorRecoverโ€™ added in.

The ContainerFactoryFactory customises instances of CustomerErrorRecover with the name of the exchange to sent rejected messages to. In the PoC - the name of this exchange is โ€˜ex.errorโ€™.

When a message arrives at โ€˜ex.error', if it has the header โ€œz-do-requeueโ€ with value โ€œtrueโ€ - the message is sent to the โ€˜ex.error.retryโ€™ exchange otherwise itโ€™s sent to the 'ex.dead-letter' exchange.

When a message arrives at the โ€˜ex.error.retryโ€™ exchange and has the header โ€˜z-attemptsโ€™ with a value of 1,2,3 or 4 - it is sent to the queue โ€˜q.error.delayโ€™ otherwise itโ€™s sent to the โ€˜ex.dead-letterโ€™ exchange.

NOTE : the maximum number of times a message is retried is controlled by the RabbitMQ configuration, not the code. This ensures the same number of retries for all TIS/ESR Microservices.

The queue โ€˜q.error.delayโ€™ is special. It has a Time To Live (TTL) of 10 seconds. This means after 10 seconds messages will be forwarded to its dead-letter-exchange : โ€˜ex.main'. This is how we retry messages - we route them to a temporary queue where they wait for a bit before being sent to the main routing exchange again. In the PoC, the TTL is set to 10 seconds. This value is specified in milliseconds and can be configured.

NOTE : the amount of delay before a message is retried is controlled by RabbitMQ configuration, not the code. This ensures the same delay for all retry messages for all TIS/ESR Microservices.

Examples of Rejected Messages produced using CustomErrorRecoverer

  1. Rejected Due to Schema Validation

Never re-tried. After a single failed attempt, sent to dead letter exchange.

ย 

2. Rejected Five Times

Re-tried four times. After the fifth rejection, send to dead-letter exchange.

Transactions

By using Transactions when processing messages, any messages sent during the processing on a message are only really sent if the message processing completes without error. The support for transactions is all within the code not in the Rabbit configuration.

The following steps are used to bring Transaction support to message processing.

  1. Ensure the RabbitTemplate has channelTransacted set to โ€˜trueโ€™.

see https://github.com/Health-Education-England/TIS-EsrRabbitErrorHandlingPoc/blob/master/src/main/java/com/hee/tis/esr/rabbiterrorpoc/errorsupport/TransactionalRabbitConfig.java

2. Declare a RabbitTransactionManager bean

see https://github.com/Health-Education-England/TIS-EsrRabbitErrorHandlingPoc/blob/master/src/main/java/com/hee/tis/esr/rabbiterrorpoc/errorsupport/TransactionalRabbitConfig.java

3. Ensure that any message processing code annotated with @MessageListener is also annotated with @Transactional

see https://github.com/Health-Education-England/TIS-EsrRabbitErrorHandlingPoc/blob/master/src/test/java/com/hee/tis/esr/rabbiterrorpoc/transactional/MessageListenerTrans.java

Optional Schema Validation

This functionality is tied into the https://github.com/Health-Education-England/TIS-EsrRabbitErrorHandlingPoc/blob/master/src/main/java/com/hee/tis/esr/rabbiterrorpoc/errorsupport/CustomMessageRecoverer.java and the https://github.com/Health-Education-England/TIS-EsrRabbitErrorHandlingPoc/blob/master/src/main/java/com/hee/tis/esr/rabbiterrorpoc/errorsupport/ContainerFactoryFactory.java. The ContainerFactoryFactory allows an optional schema to be provided to the 'createContainerFactory' method.

ย