Typically to use the @Transactional annotation in Spring Boot, you must configure transaction management. However, unless you are explicitly configuring a Global Transaction or deploying your application to a Java EE Application server , then the default is Local Transaction Management.
Global Transaction Management
When a JTA environment is detected, Spring’s
https://docs.spring.io/spring-boot/docs/2.0.x/reference/html/boot-features-jta.htmlJtaTransactionManageris used to manage transactions. Auto-configured JMS, DataSource, and JPA beans are upgraded to support XA transactions. You can use standard Spring idioms, such as@Transactional, to participate in a distributed transaction. If you are within a JTA environment and still want to use local transactions, you can set thespring.jta.enabledproperty tofalseto disable the JTA auto-configuration.
If your application needs to handle a transaction across multiple resources (JMS and Databases); then consider Distributed Transactions using Atomikos/Bitronix/Narayana/Deploying to Java EE Application Server.
If you are considering Global Distributed Transactions, then the below are good reads to get your started:
- https://docs.spring.io/spring-boot/docs/2.0.x/reference/html/boot-features-jta.html
- For Distributed XA Transactions (Global) https://spring.io/blog/2011/08/15/configuring-spring-and-jta-without-full-java-ee/
Local Transaction Management
A Local Transaction is a transaction that handles only one resource (Database, or JMS, etc)
For the example below, we will implement a JmsListener with support for Local Transaction Management. That means if at any given JMS action whether (Inbound) or (Outbound), if an exception occurs during receive/send, then we expect a transaction to be rolled back.
JMS Local Transaction Configuration:
For the JMS Configuration, we have configured:
- ActiveMQ ConnectionFactory with a Redelivery Policy of Max 2 Redeliveries.
- Set Session Acknowledge Mode to (Session Transacted)
- Configure a Marshalling Converter for Jaxb2Marshaller for Type TEXT messages.
- Use JmsTransactionManager to handle Local Transaction Management for JMS
- Configure DefaultJmsListenerContainerFactory to ensure Transaction is rolled-back and retried instead of marked for roll-back only.
package com.example.order.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.support.converter.MarshallingMessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.transaction.PlatformTransactionManager;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
@Configuration
@Slf4j
public class JMSConfiguration {
@Bean
public ActiveMQConnectionFactoryCustomizer configureActiveMqRedeliveryPolicy() {
return connectionFactory -> {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(2000);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(2);
connectionFactory.setRedeliveryPolicy(redeliveryPolicy);
log.info("Configuring ActiveMQConnectionFactoryCustomizer with RedeliveryPolicy");
};
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(@Autowired ConnectionFactory connectionFactory, @Autowired PlatformTransactionManager jmsTransactionManager) {
var factory = new DefaultJmsListenerContainerFactory();
factory.setTransactionManager(jmsTransactionManager);
factory.setConnectionFactory(connectionFactory);
factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
factory.setMessageConverter(createMarshallingMessageConverter());
log.info("Configuring jmsListenerContainerFactory");
return factory;
}
@Bean
public PlatformTransactionManager transactionManager(@Autowired ConnectionFactory connectionFactory) {
log.info("Configuring jmsTransactionManager");
return new JmsTransactionManager(connectionFactory);
}
/**
* We need to setPackagesToScan for the parent folder that contains all generated classes from our XSD(s)
*/
@Bean
public Jaxb2Marshaller jaxb2Marshaller() {
var jaxb2Marshaller = new Jaxb2Marshaller();
jaxb2Marshaller.setPackagesToScan("com.example.order.model");
return jaxb2Marshaller;
}
@Bean
public MarshallingMessageConverter createMarshallingMessageConverter() {
var marshallingMessageConverter = new MarshallingMessageConverter(jaxb2Marshaller());
marshallingMessageConverter.setTargetType(MessageType.TEXT);
log.info("Configuring MarshallingMessageConverter for TEXT message type");
return marshallingMessageConverter;
}
}
We will configure a @JmsListener to consume messages from the Inbound Queue and enable Transactional support using the @Transactional annotation. The attribute rollbackFor is configured to ensure that the Transaction is marked for roll-back when we throw a Checked Exception.
If no custom rollback rules apply, the transaction will roll back on
RuntimeExceptionandErrorbut not on checked exceptions.
CustomerOrdersInboundListener
package com.example.order.inbound;
import com.example.order.model.Root;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.support.JmsMessageHeaderAccessor;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Slf4j
public class CustomerOrdersInboundListener {
@Autowired
private CustomerOrdersHandler customerOrdersHandler;
@JmsListener(concurrency = "1", destination = "${customer-orders-queue-config.CustomerOrdersInboundQueue}")
@Transactional(rollbackFor = { Exception.class})
public void onMessage(@Payload Root order,JmsMessageHeaderAccessor jmsMessageHeaderAccessor) throws Exception{
log.debug("=========> Inbound Header Value {}", String.valueOf(jmsMessageHeaderAccessor.getHeader("InboundHeader")));
customerOrdersHandler.processMessage(order);
}
}
CustomerOrdersHandler
package com.example.order.inbound;
import com.example.order.config.JMSQueuesConfig;
import com.example.order.model.Root;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class CustomerOrdersHandler {
@Autowired
private JMSQueuesConfig jmsQueuesConfig;
@Autowired
private Jaxb2Marshaller jaxb2Marshaller;
@Autowired
private CustomerOrdersProcessor customerOrdersProcessor;
@Autowired
private JmsTemplate jmsTemplate;
public void processMessage(Root customerOrders) {
Timer.Sample timer = Timer.start();
try {
customerOrdersProcessor.processOrders(customerOrders);
timer.stop(Metrics.timer("jms_message_receive_duration", "message_type", "Root", "format_received", "TextMessage", "exception", "None"));
} catch (IllegalStateException illegalStateException) {
logExceptionAndPutOnErrorQueue(customerOrders, illegalStateException, timer);
} catch (Exception exception) {
//Any other Exceptions should log, and trigger rollback,
logException(exception, timer);
throw exception;
}
}
private void logException(Exception exception, Timer.Sample timer) {
logExceptionAndPutOnErrorQueue(null, exception, timer);
}
private void logExceptionAndPutOnErrorQueue(Root customerOrders, Exception exception, Timer.Sample timer) {
log.error("Encountered an Exception while processing message", exception);
timer.stop(Metrics.timer("jms_message_receive_duration", "exception", exception.getMessage() != null ? exception.getMessage() : "No Message"));
if (customerOrders != null) {
log.error("Putting Message on ErrorQueue: {}", jmsQueuesConfig.getCustomerOrdersErrorQueue(), exception);
jmsTemplate.convertAndSend(jmsQueuesConfig.getCustomerOrdersErrorQueue(), customerOrders);
}
}
}
CustomerOrdersProcessor
package com.example.order.inbound;
import com.example.order.config.JMSQueuesConfig;
import com.example.order.model.Root;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import java.time.LocalDate;
@Service
@Slf4j
public class CustomerOrdersProcessor {
@Autowired
private JMSQueuesConfig jmsQueuesConfig;
@Autowired
private JmsTemplate jmsTemplate;
public void processOrders(Root customerOrders) {
customerOrders.getOrders().getOrder().forEach(order -> {
order.setProcessedDate(LocalDate.now());
order.setProcessed(Boolean.TRUE);
});
sendProcessedOrdersToServiceAOutbound(customerOrders);
sendProcessedOrdersToServiceBOutbound(customerOrders);
}
private void sendProcessedOrdersToServiceAOutbound(Root customerOrders) {
//Put message on Service A Outbound Queue + Add Extra Headers to it.
jmsTemplate.convertAndSend(jmsQueuesConfig.getCustomerOrdersOutboundServiceAQueue(), customerOrders, outboundMessage -> {
outboundMessage.setStringProperty("OutboundHeader", "OutboundHeaderValue");
log.debug("Message: {} is put on Queue: {}", customerOrders, jmsQueuesConfig.getCustomerOrdersOutboundServiceAQueue());
return outboundMessage;
});
}
private void sendProcessedOrdersToServiceBOutbound(Root customerOrders) {
//Put message on Service B Outbound Queue + Add Extra Headers to it.
jmsTemplate.convertAndSend(jmsQueuesConfig.getCustomerOrdersOutboundServiceBQueue(), customerOrders, outboundMessage -> {
outboundMessage.setStringProperty("OutboundHeader", "OutboundHeaderValue");
log.debug("Message: {} is put on Queue: {}", customerOrders, jmsQueuesConfig.getCustomerOrdersOutboundServiceBQueue());
return outboundMessage;
});
}
}
Integration Testing Rollback:
To simulate an error happening during a JMS Transaction, we used Springboot @SpyBean which in this case is sufficient as we want to mock partial methods only, and leave all other method invocations to call (real methods) instead.
Expectations:
- If a Checked Exception occurs, Transaction should be rolledback (No Processed Messages should be put on Outbound Queues)
- If an IllegalStateException occurs, Transaction should be marked as complete, and any possibly processed messages on Outbound Queues should not be rolled back.
- If an IllegalStateException occurs, Processed Message should be put on Error Queue
package com.example.order.inbound;
import com.example.order.config.JMSQueuesConfig;
import com.example.order.model.Root;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.BDDMockito;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessagePostProcessor;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import javax.jms.Message;
import javax.xml.transform.stream.StreamSource;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
@ExtendWith({ OutputCaptureExtension.class })
@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class CustomerOrdersInboundListenerRollbackIT {
@Autowired
private JMSQueuesConfig jmsQueuesConfig;
@SpyBean
private CustomerOrdersHandler customerOrdersHandler;
@SpyBean
private JmsTemplate jmsTemplate;
@Autowired
private Jaxb2Marshaller jaxb2Marshaller;
@BeforeAll
private void setup() {
//Setting ReceiveTimeout, otherwise the code will bock because .receive will wait INDEFINITELY, and we know that there should be no messages
//so fail fast
//Default value is to wait indefinite ==> private long receiveTimeout = RECEIVE_TIMEOUT_INDEFINITE_WAIT;
jmsTemplate.setReceiveTimeout(1000);
}
@Test
void validateRollback_onGenericCheckedException(CapturedOutput capturedOutput) {
BDDMockito.willAnswer(invocation -> {
throw new Exception();
}).given(jmsTemplate).convertAndSend(Mockito.eq(jmsQueuesConfig.getCustomerOrdersOutboundServiceBQueue()), Mockito.any(Root.class), Mockito.any(MessagePostProcessor.class));
putMessageOnQueue();
//Verify Message is Retried Max 3 Times (RedeliveryPolicy)
Mockito.verify(customerOrdersHandler, Mockito.timeout(1000).times(3)).processMessage(Mockito.any(Root.class));
//Verify Message is put on Outbound Queue A
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("is put on Queue: EMBEDDED.AMQ.EXAMPLE.OUTBOUND.A.ROOT"));
//Verify Rollback
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("Getting transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage]"));
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("Completing transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage] after exception: java.lang.Exception"));
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("Participating transaction failed - marking existing transaction as rollback-only"));
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("Rolling back JMS transaction on Session"));
//Assert Processed Order put on Service A Outbound Queue is ROLLED back
Message serviceAOutboundMessage = jmsTemplate.receive(jmsQueuesConfig.getCustomerOrdersOutboundServiceAQueue());
Assertions.assertNull(serviceAOutboundMessage);
//Assert Processed Order is NOT put on Service B Outbound Queue
Message serviceBOutboundMessage = jmsTemplate.receive(jmsQueuesConfig.getCustomerOrdersOutboundServiceBQueue());
Assertions.assertNull(serviceBOutboundMessage);
//Assert Processed Order is NOT put on Error Queue
Message errorMessage = jmsTemplate.receive(jmsQueuesConfig.getCustomerOrdersErrorQueue());
Assertions.assertNull(errorMessage);
}
@Test
void validateNoRollback_onIllegalStateException_MessagePutOnErrorQueue(CapturedOutput capturedOutput) {
BDDMockito.willAnswer(invocation -> {
throw new IllegalStateException();
}).given(jmsTemplate).convertAndSend(Mockito.eq(jmsQueuesConfig.getCustomerOrdersOutboundServiceBQueue()), Mockito.any(Root.class), Mockito.any(MessagePostProcessor.class));
putMessageOnQueue();
//Verify Message is never retried, and put on Error Queue after Exception is Caught.
Mockito.verify(customerOrdersHandler, Mockito.timeout(10000).times(1)).processMessage(Mockito.any(Root.class));
//Verify Message is put on Outbound Queue A
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("is put on Queue: EMBEDDED.AMQ.EXAMPLE.OUTBOUND.A.ROOT"));
//Verify Transaction Commit without Rollback Taking Place
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("Getting transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage]"));
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("Putting Message on ErrorQueue: EMBEDDED.AMQ.EXAMPLE.ERROR.ROOT"));
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("Completing transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage]"));
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("Committing JMS transaction on Session"));
//Assert Processed Order put on Service A Outbound Queue is NOT rolled back.
Message serviceAOutboundMessage = jmsTemplate.receive(jmsQueuesConfig.getCustomerOrdersOutboundServiceAQueue());
Assertions.assertNotNull(serviceAOutboundMessage);
//Assert Processed Order is NOT put on Service B Outbound Queue
Message serviceBOutboundMessage = jmsTemplate.receive(jmsQueuesConfig.getCustomerOrdersOutboundServiceBQueue());
Assertions.assertNull(serviceBOutboundMessage);
//Assert Processed Order is put on Error Queue since IllegalStateException is caught.
Message errorMessage = jmsTemplate.receive(jmsQueuesConfig.getCustomerOrdersErrorQueue());
Assertions.assertNotNull(errorMessage);
}
private void putMessageOnQueue() {
InputStream inputStream = getClass().getClassLoader().getResourceAsStream("messages/CustomerOrders.xml");
Root rootMessageToSend = Root.class.cast(jaxb2Marshaller.unmarshal(new StreamSource(inputStream)));
//Using MessagePostProcessor to pass extra Message Headers with the (Root) message
jmsTemplate.convertAndSend(jmsQueuesConfig.getCustomerOrdersInboundQueue(), rootMessageToSend, message -> {
message.setStringProperty("InboundHeader", "InboundHeaderValue");
return message;
});
}
}
Integration Testing Happy Path:
package com.example.order.inbound;
import com.example.order.config.JMSQueuesConfig;
import com.example.order.model.Root;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.test.annotation.DirtiesContext;
import javax.jms.Message;
import javax.xml.transform.stream.StreamSource;
import java.io.IOException;
import java.io.InputStream;
@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@DirtiesContext
class CustomerOrdersInboundListenerIT {
@Autowired
private JMSQueuesConfig jmsQueuesConfig;
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Jaxb2Marshaller jaxb2Marshaller;
@Autowired
private MessageConverter messageConverter;
@Test
void validateSuccessfulProcessing() throws Exception {
putMessageOnQueue();
//Assert Processed Order is put on Service A Outbound Queue
Message outboundServiceAMessage = jmsTemplate.receive(jmsQueuesConfig.getCustomerOrdersOutboundServiceAQueue());
Assertions.assertNotNull(outboundServiceAMessage);
Root processedCustomerOrders = Root.class.cast(messageConverter.fromMessage(outboundServiceAMessage));
//Verify Processing
processedCustomerOrders.getOrders().getOrder().forEach(order -> {
Assertions.assertTrue(order.isProcessed());
});
//Validate all Headers being sent with processed message
Assertions.assertEquals("OutboundHeaderValue", outboundServiceAMessage.getStringProperty("OutboundHeader"));
//Assert Processed Order is put on Service B Outbound Queue
Message outboundServiceBMessage = jmsTemplate.receive(jmsQueuesConfig.getCustomerOrdersOutboundServiceBQueue());
Assertions.assertNotNull(outboundServiceBMessage);
}
private void putMessageOnQueue() throws IOException {
InputStream inputStream = getClass().getClassLoader().getResourceAsStream("messages/CustomerOrders.xml");
Root rootMessageToSend = Root.class.cast(jaxb2Marshaller.unmarshal(new StreamSource(inputStream)));
//Using MessagePostProcessor to pass extra Message Headers with the (Root) message
jmsTemplate.convertAndSend(jmsQueuesConfig.getCustomerOrdersInboundQueue(), rootMessageToSend, message -> {
message.setStringProperty("InboundHeader", "InboundHeaderValue");
return message;
});
}
}
Important notes to pay attention to:
- If we use only @Transactional annotation, without any JmsTransactionManager configured, then no Transaction logic is going to occur.
- If we use @Transactional annotation without specifying rollbackFor, then checked exceptions will be ignored and Transaction is marked Completed.
2022-03-16 13:07:59.271 DEBUG 29704 --- [ntContainer#0-1] o.s.j.connection.JmsTransactionManager : Participating in existing transaction
2022-03-16 13:07:59.271 TRACE 29704 --- [ntContainer#0-1] o.s.t.i.TransactionInterceptor : Getting transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage]
2022-03-16 13:07:59.271 DEBUG 29704 --- [ntContainer#0-1] c.e.o.i.CustomerOrdersInboundListener : =========> Inbound Header Value InboundHeaderValue
2022-03-16 13:07:59.286 DEBUG 29704 --- [ntContainer#0-1] c.e.o.inbound.CustomerOrdersProcessor : Putting com.example.order.model.Root@10ad95cd to Queue: EMBEDDED.AMQ.EXAMPLE.OUTBOUND.A.ROOT
2022-03-16 13:07:59.286 ERROR 29704 --- [ntContainer#0-1] c.e.order.inbound.CustomerOrdersHandler : Encountered an Exception while processing message
2022-03-16 13:07:59.286 TRACE 29704 --- [ntContainer#0-1] o.s.t.i.TransactionInterceptor : Completing transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage] after exception: java.lang.Exception
2022-03-16 13:07:59.286 WARN 29704 --- [ntContainer#0-1] o.s.j.l.DefaultMessageListenerContainer : Execution of JMS message listener failed, and no ErrorHandler has been set.
- If we use @Transactional annotation, specify rollbackFor, but without configuring DefaultJmsListenerContainerFactory and setTransactionManager explicitly then the transation is marked as roll-back but will not be retried.
2022-03-16 13:09:21.053 DEBUG 28916 --- [ntContainer#0-1] o.s.j.connection.JmsTransactionManager : Participating in existing transaction
2022-03-16 13:09:21.053 TRACE 28916 --- [ntContainer#0-1] o.s.t.i.TransactionInterceptor : Getting transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage]
2022-03-16 13:09:21.063 DEBUG 28916 --- [ntContainer#0-1] c.e.o.i.CustomerOrdersInboundListener : =========> Inbound Header Value InboundHeaderValue
2022-03-16 13:09:21.073 DEBUG 28916 --- [ntContainer#0-1] c.e.o.inbound.CustomerOrdersProcessor : Putting com.example.order.model.Root@5962133c to Queue: EMBEDDED.AMQ.EXAMPLE.OUTBOUND.A.ROOT
2022-03-16 13:09:21.073 ERROR 28916 --- [ntContainer#0-1] c.e.order.inbound.CustomerOrdersHandler : Encountered an Exception while processing message
2022-03-16 13:09:21.073 TRACE 28916 --- [ntContainer#0-1] o.s.t.i.TransactionInterceptor : Completing transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage] after exception: java.lang.Exception
2022-03-16 13:09:21.073 DEBUG 28916 --- [ntContainer#0-1] o.s.j.connection.JmsTransactionManager : Participating transaction failed - marking existing transaction as rollback-only
- If we use @Transactional annotation, specify rollbackFor, configure DefaultJmsListenerContainerFactory and setTransactionManager then a transaction will be marked roll-back and will be retried however many times the redelivery policy is configured to. Default maximumRedeliveries is 6.
2022-03-16 13:14:03.157 TRACE 9476 --- [ntContainer#0-1] o.s.t.i.TransactionInterceptor : Getting transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage]
2022-03-16 13:14:03.170 DEBUG 9476 --- [ntContainer#0-1] c.e.o.i.CustomerOrdersInboundListener : =========> Inbound Header Value InboundHeaderValue
2022-03-16 13:14:03.170 DEBUG 9476 --- [ntContainer#0-1] c.e.o.inbound.CustomerOrdersProcessor : Putting com.example.order.model.Root@6540af74 to Queue: EMBEDDED.AMQ.EXAMPLE.OUTBOUND.A.ROOT
2022-03-16 13:14:03.170 ERROR 9476 --- [ntContainer#0-1] c.e.order.inbound.CustomerOrdersHandler : Encountered an Exception while processing message
2022-03-16 13:14:03.186 TRACE 9476 --- [ntContainer#0-1] o.s.t.i.TransactionInterceptor : Completing transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage] after exception: java.lang.Exception
2022-03-16 13:14:03.186 DEBUG 9476 --- [ntContainer#0-1] o.s.j.connection.JmsTransactionManager : Participating transaction failed - marking existing transaction as rollback-only
2022-03-16 13:14:03.186 WARN 9476 --- [ntContainer#0-1] o.s.j.l.DefaultMessageListenerContainer : Execution of JMS message listener failed, and no ErrorHandler has been set.
2022-03-16 13:14:03.186 DEBUG 9476 --- [ntContainer#0-1] o.s.j.connection.JmsTransactionManager : Transactional code has requested rollback
2022-03-16 13:14:03.186 DEBUG 9476 --- [ntContainer#0-1] o.s.j.connection.JmsTransactionManager : Initiating transaction rollback
2022-03-16 13:14:03.186 DEBUG 9476 --- [ntContainer#0-1] o.s.j.connection.JmsTransactionManager : Rolling back JMS transaction on Session
It is useful to setting Log Level Trace for Transaction Management to help you in understanding the workflow of how JmsTransactionManager and Springboot are dealing with Jms Transactions.
spring:
application:
name: customer-orders
activemq:
broker-url: vm://embedded?broker.persistent=true,useShutdownHook=false
in-memory: true
non-blocking-redelivery: true
customer-orders-queue-config:
CustomerOrdersInboundQueue: EMBEDDED.AMQ.EXAMPLE.INBOUND.ROOT
CustomerOrdersOutboundServiceAQueue: EMBEDDED.AMQ.EXAMPLE.OUTBOUND.A.ROOT
CustomerOrdersOutboundServiceBQueue: EMBEDDED.AMQ.EXAMPLE.OUTBOUND.B.ROOT
CustomerOrdersErrorQueue: EMBEDDED.AMQ.EXAMPLE.ERROR.ROOT
logging:
level:
root: INFO
'[com.example.order]': DEBUG
'[org.springframework.transaction.interceptor]': TRACE
'[org.springframework.jms.connection.JmsTransactionManager]': TRACE
The complete code for this is here.