ESR Rejected RabbitMQ message processing
The TIS/ESR Microservices are implemented in Java and (typically) do work by processing inbound RabbitMQ messages.
Current Situation
At the moment, there is limited error handing support within these Microservices. If an error occurs and exception will be thrown and 1 of 4 things will happen:
if the queue does NOT have an associated dead-letter exchange
If the exception is an instance of
AmqpRejectAndDontRequeueException
the message will be dropped.otherwise the message will be returned to the source queue for repeated processing. This can lead to an infinite loop.
if the queue does have an associated dead-letter exchange
If the exception is an instance of
AmqpRejectAndDontRequeueException
the message will be sent to the dead-letter exchange.otherwise the message will be returned to the source queue for repeated processing. This can lead to an infinite loop.
In the case where the message is rejected to the dead-letter exchange, it does not carry details of the exception that caused the rejection.
The current situation does not support retries. For problems like OptimisticLockingFailureException - we would like to retry a number of times as these errors should be temporary.
If during the processing of one message we fail AFTER sending a further message, the sent messages cannot be un-sent.
Proposed Solution
The proposed solution adds the following capabilities:
Retries. The ability for messages to be retried. The number of retries and the delay between retries is configurable.
Error details. When a message is rejected, error details will be added to the headers of rejected messages.
Transactions. Rabbit messages will be processed within Transactions. This means that messages โsentโ during the transaction will only be actually sent if the processing completes successfully.
Optional Schema Validation. The proposed solution allows optional schema validation to reject invalid messages before they get passed to the main message processing code.
The following github project holds the source code of a Proof of Concept (PoC) used to develop these capabilities.
https://github.com/Health-Education-England/TIS-EsrRabbitErrorHandlingPoc
Retries and Error Details
In order to add these capabilities we have to write some custom error handling logic in addition to adding some new Rabbit exchanges/queues and routes. Here is a diagram showing the Rabbit Configuration used by the PoC.
ย
ย
The โex.dead-letterโ and โq.dead-letterโ take the place of the โmain.dlx' and 'esr.dlq.allโ in the current ESR Rabbit config. They represent a dead-letter exchange and a dead-letter queue attached to the dead-letter exchange.
The โex.errorโ, โex.error.delayโ and โq.error.delayโ are new Rabbit resources required to support retries.
The โex.main' takes the place of the 'main.exchange' in the current ESR Rabbit Config.
The โq.type.1โ is used in the POC to represent a typical Rabbit queue which stores messages to be received and processed by a TIS/ESR Microservice.
The CustomErrorRecover intercepts exceptions from the processing of a Rabbit message. The CustomErrorRecover adds the following headers to the Rabbit message before forwarding it to the โex.error' exchange. The https://github.com/Health-Education-England/TIS-EsrRabbitErrorHandlingPoc/blob/master/src/main/java/com/hee/tis/esr/rabbiterrorpoc/errorsupport/CustomMessageRecoverer.java is code written for the PoC - it extends the framework class spring-amqp/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecoverer.java at main ยท spring-projects/spring-amqp
Header | Description |
---|---|
| number of failed attempts. If z-attempts header does not exist, itโs set to 1. If it does exist, itโs incremented by 1. |
| set to โfalse' if the failure was caused by โ |
| The underlying type of exception which caused this failure. |
| This is a list of validation errors if the underlying exception type was ' |
| This holds the actual Java stacktrace of the exception which caused the failure. |
| The message of the exception which caused the failure |
| The exchange of the message before the error occurred |
| The routing key of the message before the error occurred. |
Typically, each @MessageListener annotation specifies the name of a specific โcontainerFactoryโ Spring Bean which customises the link between TIS/ESR code and RabbitMQ. In the PoC, the โcontainerFactoryโ used is created by the class โhttps://github.com/Health-Education-England/TIS-EsrRabbitErrorHandlingPoc/blob/master/src/main/java/com/hee/tis/esr/rabbiterrorpoc/errorsupport/ContainerFactoryFactory.java'. This can create โContainer Factoryโ spring beans which have the 'CustomErrorRecoverโ added in.
The ContainerFactoryFactory customises instances of CustomerErrorRecover with the name of the exchange to sent rejected messages to. In the PoC - the name of this exchange is โex.errorโ.
When a message arrives at โex.error', if it has the header โz-do-requeueโ with value โtrueโ - the message is sent to the โex.error.retryโ exchange otherwise itโs sent to the 'ex.dead-letter' exchange.
When a message arrives at the โex.error.retryโ exchange and has the header โz-attemptsโ with a value of 1,2,3 or 4 - it is sent to the queue โq.error.delayโ otherwise itโs sent to the โex.dead-letterโ exchange.
NOTE : the maximum number of times a message is retried is controlled by the RabbitMQ configuration, not the code. This ensures the same number of retries for all TIS/ESR Microservices.
The queue โq.error.delayโ is special. It has a Time To Live (TTL) of 10 seconds. This means after 10 seconds messages will be forwarded to its dead-letter-exchange : โex.main'. This is how we retry messages - we route them to a temporary queue where they wait for a bit before being sent to the main routing exchange again. In the PoC, the TTL is set to 10 seconds. This value is specified in milliseconds and can be configured.
NOTE : the amount of delay before a message is retried is controlled by RabbitMQ configuration, not the code. This ensures the same delay for all retry messages for all TIS/ESR Microservices.
Examples of Rejected Messages produced using CustomErrorRecoverer
Rejected Due to Schema Validation
Never re-tried. After a single failed attempt, sent to dead letter exchange.
ย
2. Rejected Five Times
Re-tried four times. After the fifth rejection, send to dead-letter exchange.
Transactions
By using Transactions when processing messages, any messages sent during the processing on a message are only really sent if the message processing completes without error. The support for transactions is all within the code not in the Rabbit configuration.
The following steps are used to bring Transaction support to message processing.
Ensure the RabbitTemplate has channelTransacted set to โtrueโ.
2. Declare a RabbitTransactionManager bean
3. Ensure that any message processing code annotated with @MessageListener is also annotated with @Transactional
Optional Schema Validation
This functionality is tied into the https://github.com/Health-Education-England/TIS-EsrRabbitErrorHandlingPoc/blob/master/src/main/java/com/hee/tis/esr/rabbiterrorpoc/errorsupport/CustomMessageRecoverer.java and the https://github.com/Health-Education-England/TIS-EsrRabbitErrorHandlingPoc/blob/master/src/main/java/com/hee/tis/esr/rabbiterrorpoc/errorsupport/ContainerFactoryFactory.java. The ContainerFactoryFactory allows an optional schema to be provided to the 'createContainerFactory' method.
ย
Slack: https://hee-nhs-tis.slack.com/
Jira issues: https://hee-tis.atlassian.net/issues/?filter=14213