Integration Testing JMS @Transactional with Embedded ActiveMQ


Typically to use the @Transactional annotation in Spring Boot, you must configure transaction management. However, unless you are explicitly configuring a Global Transaction or deploying your application to a Java EE Application server , then the default is Local Transaction Management.

Global Transaction Management

When a JTA environment is detected, Spring’s JtaTransactionManager is used to manage transactions. Auto-configured JMS, DataSource, and JPA beans are upgraded to support XA transactions. You can use standard Spring idioms, such as @Transactional, to participate in a distributed transaction. If you are within a JTA environment and still want to use local transactions, you can set the spring.jta.enabled property to false to disable the JTA auto-configuration.

https://docs.spring.io/spring-boot/docs/2.0.x/reference/html/boot-features-jta.html

If your application needs to handle a transaction across multiple resources (JMS and Databases); then consider Distributed Transactions using Atomikos/Bitronix/Narayana/Deploying to Java EE Application Server.

If you are considering Global Distributed Transactions, then the below are good reads to get your started:

Local Transaction Management

A Local Transaction is a transaction that handles only one resource (Database, or JMS, etc)


For the example below, we will implement a JmsListener with support for Local Transaction Management. That means if at any given JMS action whether (Inbound) or (Outbound), if an exception occurs during receive/send, then we expect a transaction to be rolled back.


JMS Local Transaction Configuration:

For the JMS Configuration, we have configured:

  • ActiveMQ ConnectionFactory with a Redelivery Policy of Max 2 Redeliveries.
  • Set Session Acknowledge Mode to (Session Transacted)
  • Configure a Marshalling Converter for Jaxb2Marshaller for Type TEXT messages.
  • Use JmsTransactionManager to handle Local Transaction Management for JMS
  • Configure DefaultJmsListenerContainerFactory to ensure Transaction is rolled-back and retried instead of marked for roll-back only.
package com.example.order.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.support.converter.MarshallingMessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.transaction.PlatformTransactionManager;

import javax.jms.ConnectionFactory;
import javax.jms.Session;

@Configuration
@Slf4j
public class JMSConfiguration {

    @Bean
    public ActiveMQConnectionFactoryCustomizer configureActiveMqRedeliveryPolicy() {

        return connectionFactory -> {

            RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
            redeliveryPolicy.setInitialRedeliveryDelay(2000);
            redeliveryPolicy.setBackOffMultiplier(2);
            redeliveryPolicy.setUseExponentialBackOff(true);
            redeliveryPolicy.setMaximumRedeliveries(2);

            connectionFactory.setRedeliveryPolicy(redeliveryPolicy);

            log.info("Configuring ActiveMQConnectionFactoryCustomizer with RedeliveryPolicy");

        };
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(@Autowired ConnectionFactory connectionFactory, @Autowired PlatformTransactionManager jmsTransactionManager) {

        var factory = new DefaultJmsListenerContainerFactory();
        factory.setTransactionManager(jmsTransactionManager);
        factory.setConnectionFactory(connectionFactory);
        factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        factory.setMessageConverter(createMarshallingMessageConverter());

        log.info("Configuring jmsListenerContainerFactory");

        return factory;
    }

    @Bean
    public PlatformTransactionManager transactionManager(@Autowired ConnectionFactory connectionFactory) {

        log.info("Configuring jmsTransactionManager");

        return new JmsTransactionManager(connectionFactory);
    }

    /**
     * We need to setPackagesToScan for the parent folder that contains all generated classes from our XSD(s)
     */
    @Bean
    public Jaxb2Marshaller jaxb2Marshaller() {

        var jaxb2Marshaller = new Jaxb2Marshaller();

        jaxb2Marshaller.setPackagesToScan("com.example.order.model");

        return jaxb2Marshaller;
    }

    @Bean
    public MarshallingMessageConverter createMarshallingMessageConverter() {

        var marshallingMessageConverter = new MarshallingMessageConverter(jaxb2Marshaller());
        marshallingMessageConverter.setTargetType(MessageType.TEXT);

        log.info("Configuring MarshallingMessageConverter for TEXT message type");

        return marshallingMessageConverter;
    }

}


We will configure a @JmsListener to consume messages from the Inbound Queue and enable Transactional support using the @Transactional annotation. The attribute rollbackFor is configured to ensure that the Transaction is marked for roll-back when we throw a Checked Exception.

If no custom rollback rules apply, the transaction will roll back on RuntimeException and Error but not on checked exceptions.


CustomerOrdersInboundListener

package com.example.order.inbound;

import com.example.order.model.Root;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.support.JmsMessageHeaderAccessor;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Slf4j
public class CustomerOrdersInboundListener {

    @Autowired
    private CustomerOrdersHandler customerOrdersHandler;

    @JmsListener(concurrency = "1", destination = "${customer-orders-queue-config.CustomerOrdersInboundQueue}")
    @Transactional(rollbackFor = { Exception.class})
    public void onMessage(@Payload Root order,JmsMessageHeaderAccessor jmsMessageHeaderAccessor) throws Exception{

        log.debug("=========> Inbound Header Value {}", String.valueOf(jmsMessageHeaderAccessor.getHeader("InboundHeader")));
        customerOrdersHandler.processMessage(order);

    }

}


CustomerOrdersHandler

package com.example.order.inbound;

import com.example.order.config.JMSQueuesConfig;
import com.example.order.model.Root;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class CustomerOrdersHandler {

    @Autowired
    private JMSQueuesConfig jmsQueuesConfig;

    @Autowired
    private Jaxb2Marshaller jaxb2Marshaller;

    @Autowired
    private CustomerOrdersProcessor customerOrdersProcessor;

    @Autowired
    private JmsTemplate jmsTemplate;

    public void processMessage(Root customerOrders) {

        Timer.Sample timer = Timer.start();

        try {

            customerOrdersProcessor.processOrders(customerOrders);

            timer.stop(Metrics.timer("jms_message_receive_duration", "message_type", "Root", "format_received", "TextMessage", "exception", "None"));

        } catch (IllegalStateException illegalStateException) {

            logExceptionAndPutOnErrorQueue(customerOrders, illegalStateException, timer);

        } catch (Exception exception) {
            //Any other Exceptions should log, and trigger rollback,
            logException(exception, timer);

            throw exception;
        }

    }

    private void logException(Exception exception, Timer.Sample timer) {
        logExceptionAndPutOnErrorQueue(null, exception, timer);
    }

    private void logExceptionAndPutOnErrorQueue(Root customerOrders, Exception exception, Timer.Sample timer) {

        log.error("Encountered an Exception while processing message", exception);

        timer.stop(Metrics.timer("jms_message_receive_duration", "exception", exception.getMessage() != null ? exception.getMessage() : "No Message"));

        if (customerOrders != null) {

            log.error("Putting Message on ErrorQueue: {}", jmsQueuesConfig.getCustomerOrdersErrorQueue(), exception);

            jmsTemplate.convertAndSend(jmsQueuesConfig.getCustomerOrdersErrorQueue(), customerOrders);
        }
    }

}

CustomerOrdersProcessor

package com.example.order.inbound;

import com.example.order.config.JMSQueuesConfig;
import com.example.order.model.Root;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import java.time.LocalDate;

@Service
@Slf4j
public class CustomerOrdersProcessor {

    @Autowired
    private JMSQueuesConfig jmsQueuesConfig;

    @Autowired
    private JmsTemplate jmsTemplate;

    public void processOrders(Root customerOrders) {

        customerOrders.getOrders().getOrder().forEach(order -> {
            order.setProcessedDate(LocalDate.now());
            order.setProcessed(Boolean.TRUE);
        });

        sendProcessedOrdersToServiceAOutbound(customerOrders);
        sendProcessedOrdersToServiceBOutbound(customerOrders);

    }

    private void sendProcessedOrdersToServiceAOutbound(Root customerOrders) {

        //Put message on Service A Outbound  Queue + Add Extra Headers to it.
        jmsTemplate.convertAndSend(jmsQueuesConfig.getCustomerOrdersOutboundServiceAQueue(), customerOrders, outboundMessage -> {

            outboundMessage.setStringProperty("OutboundHeader", "OutboundHeaderValue");

            log.debug("Message: {} is put on Queue: {}", customerOrders, jmsQueuesConfig.getCustomerOrdersOutboundServiceAQueue());
            return outboundMessage;
        });

    }

    private void sendProcessedOrdersToServiceBOutbound(Root customerOrders) {

        //Put message on Service B Outbound  Queue + Add Extra Headers to it.
        jmsTemplate.convertAndSend(jmsQueuesConfig.getCustomerOrdersOutboundServiceBQueue(), customerOrders, outboundMessage -> {

            outboundMessage.setStringProperty("OutboundHeader", "OutboundHeaderValue");

            log.debug("Message: {} is put on Queue: {}", customerOrders, jmsQueuesConfig.getCustomerOrdersOutboundServiceBQueue());
            return outboundMessage;
        });

    }

}


Integration Testing Rollback:

To simulate an error happening during a JMS Transaction, we used Springboot @SpyBean which in this case is sufficient as we want to mock partial methods only, and leave all other method invocations to call (real methods) instead.

Expectations:

  • If a Checked Exception occurs, Transaction should be rolledback (No Processed Messages should be put on Outbound Queues)
  • If an IllegalStateException occurs, Transaction should be marked as complete, and any possibly processed messages on Outbound Queues should not be rolled back.
  • If an IllegalStateException occurs, Processed Message should be put on Error Queue
package com.example.order.inbound;

import com.example.order.config.JMSQueuesConfig;
import com.example.order.model.Root;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.BDDMockito;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessagePostProcessor;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;

import javax.jms.Message;
import javax.xml.transform.stream.StreamSource;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;

@ExtendWith({ OutputCaptureExtension.class })
@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class CustomerOrdersInboundListenerRollbackIT {

    @Autowired
    private JMSQueuesConfig jmsQueuesConfig;

    @SpyBean
    private CustomerOrdersHandler customerOrdersHandler;

    @SpyBean
    private JmsTemplate jmsTemplate;

    @Autowired
    private Jaxb2Marshaller jaxb2Marshaller;

    @BeforeAll
    private void setup() {
        //Setting ReceiveTimeout, otherwise the code will bock because .receive will wait INDEFINITELY, and we know that there should be no messages
        //so fail fast
        //Default value is to wait indefinite ==> private long receiveTimeout = RECEIVE_TIMEOUT_INDEFINITE_WAIT;
        jmsTemplate.setReceiveTimeout(1000);
    }

    @Test
    void validateRollback_onGenericCheckedException(CapturedOutput capturedOutput) {

        BDDMockito.willAnswer(invocation -> {
            throw new Exception();
        }).given(jmsTemplate).convertAndSend(Mockito.eq(jmsQueuesConfig.getCustomerOrdersOutboundServiceBQueue()), Mockito.any(Root.class), Mockito.any(MessagePostProcessor.class));

        putMessageOnQueue();

        //Verify Message is Retried Max 3 Times (RedeliveryPolicy)
        Mockito.verify(customerOrdersHandler, Mockito.timeout(1000).times(3)).processMessage(Mockito.any(Root.class));

        //Verify Message is put on Outbound Queue A
        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("is put on Queue: EMBEDDED.AMQ.EXAMPLE.OUTBOUND.A.ROOT"));

        //Verify Rollback
        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("Getting transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage]"));
        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("Completing transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage] after exception: java.lang.Exception"));
        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("Participating transaction failed - marking existing transaction as rollback-only"));
        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("Rolling back JMS transaction on Session"));

        //Assert Processed Order put on Service A Outbound Queue is ROLLED back
        Message serviceAOutboundMessage = jmsTemplate.receive(jmsQueuesConfig.getCustomerOrdersOutboundServiceAQueue());
        Assertions.assertNull(serviceAOutboundMessage);

        //Assert Processed Order is NOT put on Service B Outbound Queue
        Message serviceBOutboundMessage = jmsTemplate.receive(jmsQueuesConfig.getCustomerOrdersOutboundServiceBQueue());
        Assertions.assertNull(serviceBOutboundMessage);

        //Assert Processed Order is NOT put on Error Queue
        Message errorMessage = jmsTemplate.receive(jmsQueuesConfig.getCustomerOrdersErrorQueue());
        Assertions.assertNull(errorMessage);

    }

    @Test
    void validateNoRollback_onIllegalStateException_MessagePutOnErrorQueue(CapturedOutput capturedOutput) {

        BDDMockito.willAnswer(invocation -> {
            throw new IllegalStateException();
        }).given(jmsTemplate).convertAndSend(Mockito.eq(jmsQueuesConfig.getCustomerOrdersOutboundServiceBQueue()), Mockito.any(Root.class), Mockito.any(MessagePostProcessor.class));

        putMessageOnQueue();

        //Verify Message is never retried, and put on Error Queue after Exception is Caught.
        Mockito.verify(customerOrdersHandler, Mockito.timeout(10000).times(1)).processMessage(Mockito.any(Root.class));

        //Verify Message is put on Outbound Queue A
        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("is put on Queue: EMBEDDED.AMQ.EXAMPLE.OUTBOUND.A.ROOT"));

        //Verify Transaction Commit without Rollback Taking Place
        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("Getting transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage]"));
        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("Putting Message on ErrorQueue: EMBEDDED.AMQ.EXAMPLE.ERROR.ROOT"));
        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("Completing transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage]"));
        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> capturedOutput.getOut().contains("Committing JMS transaction on Session"));

        //Assert Processed Order put on Service A Outbound Queue is NOT rolled back.
        Message serviceAOutboundMessage = jmsTemplate.receive(jmsQueuesConfig.getCustomerOrdersOutboundServiceAQueue());
        Assertions.assertNotNull(serviceAOutboundMessage);

        //Assert Processed Order is NOT put on Service B Outbound Queue
        Message serviceBOutboundMessage = jmsTemplate.receive(jmsQueuesConfig.getCustomerOrdersOutboundServiceBQueue());
        Assertions.assertNull(serviceBOutboundMessage);

        //Assert Processed Order is put on Error Queue since IllegalStateException is caught.
        Message errorMessage = jmsTemplate.receive(jmsQueuesConfig.getCustomerOrdersErrorQueue());
        Assertions.assertNotNull(errorMessage);

    }

    private void putMessageOnQueue() {
        InputStream inputStream = getClass().getClassLoader().getResourceAsStream("messages/CustomerOrders.xml");

        Root rootMessageToSend = Root.class.cast(jaxb2Marshaller.unmarshal(new StreamSource(inputStream)));

        //Using MessagePostProcessor to pass extra Message Headers with the (Root) message
        jmsTemplate.convertAndSend(jmsQueuesConfig.getCustomerOrdersInboundQueue(), rootMessageToSend, message -> {
            message.setStringProperty("InboundHeader", "InboundHeaderValue");
            return message;
        });
    }

}



Integration Testing Happy Path:

package com.example.order.inbound;

import com.example.order.config.JMSQueuesConfig;
import com.example.order.model.Root;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.test.annotation.DirtiesContext;

import javax.jms.Message;
import javax.xml.transform.stream.StreamSource;
import java.io.IOException;
import java.io.InputStream;

@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@DirtiesContext
class CustomerOrdersInboundListenerIT {

    @Autowired
    private JMSQueuesConfig jmsQueuesConfig;

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Jaxb2Marshaller jaxb2Marshaller;

    @Autowired
    private MessageConverter messageConverter;

    @Test
    void validateSuccessfulProcessing() throws Exception {

        putMessageOnQueue();

        //Assert Processed Order is put on Service A Outbound Queue
        Message outboundServiceAMessage = jmsTemplate.receive(jmsQueuesConfig.getCustomerOrdersOutboundServiceAQueue());
        Assertions.assertNotNull(outboundServiceAMessage);

        Root processedCustomerOrders = Root.class.cast(messageConverter.fromMessage(outboundServiceAMessage));

        //Verify Processing
        processedCustomerOrders.getOrders().getOrder().forEach(order -> {
            Assertions.assertTrue(order.isProcessed());
        });

        //Validate all Headers being sent with processed message
        Assertions.assertEquals("OutboundHeaderValue", outboundServiceAMessage.getStringProperty("OutboundHeader"));

        //Assert Processed Order is put on Service B Outbound Queue
        Message outboundServiceBMessage = jmsTemplate.receive(jmsQueuesConfig.getCustomerOrdersOutboundServiceBQueue());
        Assertions.assertNotNull(outboundServiceBMessage);

    }

    private void putMessageOnQueue() throws IOException {
        InputStream inputStream = getClass().getClassLoader().getResourceAsStream("messages/CustomerOrders.xml");

        Root rootMessageToSend = Root.class.cast(jaxb2Marshaller.unmarshal(new StreamSource(inputStream)));

        //Using MessagePostProcessor to pass extra Message Headers with the (Root) message
        jmsTemplate.convertAndSend(jmsQueuesConfig.getCustomerOrdersInboundQueue(), rootMessageToSend, message -> {
            message.setStringProperty("InboundHeader", "InboundHeaderValue");
            return message;
        });
    }

}



Important notes to pay attention to:

  • If we use only @Transactional annotation, without any JmsTransactionManager configured, then no Transaction logic is going to occur.
  • If we use @Transactional annotation without specifying rollbackFor, then checked exceptions will be ignored and Transaction is marked Completed.
2022-03-16 13:07:59.271 DEBUG 29704 --- [ntContainer#0-1] o.s.j.connection.JmsTransactionManager   : Participating in existing transaction
2022-03-16 13:07:59.271 TRACE 29704 --- [ntContainer#0-1] o.s.t.i.TransactionInterceptor           : Getting transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage]
2022-03-16 13:07:59.271 DEBUG 29704 --- [ntContainer#0-1] c.e.o.i.CustomerOrdersInboundListener    : =========> Inbound Header Value InboundHeaderValue
2022-03-16 13:07:59.286 DEBUG 29704 --- [ntContainer#0-1] c.e.o.inbound.CustomerOrdersProcessor    : Putting com.example.order.model.Root@10ad95cd to Queue: EMBEDDED.AMQ.EXAMPLE.OUTBOUND.A.ROOT
2022-03-16 13:07:59.286 ERROR 29704 --- [ntContainer#0-1] c.e.order.inbound.CustomerOrdersHandler  : Encountered an Exception while processing message
2022-03-16 13:07:59.286 TRACE 29704 --- [ntContainer#0-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage] after exception: java.lang.Exception
2022-03-16 13:07:59.286  WARN 29704 --- [ntContainer#0-1] o.s.j.l.DefaultMessageListenerContainer  : Execution of JMS message listener failed, and no ErrorHandler has been set.

  • If we use @Transactional annotation, specify rollbackFor, but without configuring DefaultJmsListenerContainerFactory and setTransactionManager explicitly then the transation is marked as roll-back but will not be retried.
2022-03-16 13:09:21.053 DEBUG 28916 --- [ntContainer#0-1] o.s.j.connection.JmsTransactionManager   : Participating in existing transaction
2022-03-16 13:09:21.053 TRACE 28916 --- [ntContainer#0-1] o.s.t.i.TransactionInterceptor           : Getting transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage]
2022-03-16 13:09:21.063 DEBUG 28916 --- [ntContainer#0-1] c.e.o.i.CustomerOrdersInboundListener    : =========> Inbound Header Value InboundHeaderValue
2022-03-16 13:09:21.073 DEBUG 28916 --- [ntContainer#0-1] c.e.o.inbound.CustomerOrdersProcessor    : Putting com.example.order.model.Root@5962133c to Queue: EMBEDDED.AMQ.EXAMPLE.OUTBOUND.A.ROOT
2022-03-16 13:09:21.073 ERROR 28916 --- [ntContainer#0-1] c.e.order.inbound.CustomerOrdersHandler  : Encountered an Exception while processing message

2022-03-16 13:09:21.073 TRACE 28916 --- [ntContainer#0-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage] after exception: java.lang.Exception
2022-03-16 13:09:21.073 DEBUG 28916 --- [ntContainer#0-1] o.s.j.connection.JmsTransactionManager   : Participating transaction failed - marking existing transaction as rollback-only

  • If we use @Transactional annotation, specify rollbackFor, configure DefaultJmsListenerContainerFactory and setTransactionManager then a transaction will be marked roll-back and will be retried however many times the redelivery policy is configured to. Default maximumRedeliveries is 6.
2022-03-16 13:14:03.157 TRACE 9476 --- [ntContainer#0-1] o.s.t.i.TransactionInterceptor           : Getting transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage]
2022-03-16 13:14:03.170 DEBUG 9476 --- [ntContainer#0-1] c.e.o.i.CustomerOrdersInboundListener    : =========> Inbound Header Value InboundHeaderValue
2022-03-16 13:14:03.170 DEBUG 9476 --- [ntContainer#0-1] c.e.o.inbound.CustomerOrdersProcessor    : Putting com.example.order.model.Root@6540af74 to Queue: EMBEDDED.AMQ.EXAMPLE.OUTBOUND.A.ROOT
2022-03-16 13:14:03.170 ERROR 9476 --- [ntContainer#0-1] c.e.order.inbound.CustomerOrdersHandler  : Encountered an Exception while processing message

2022-03-16 13:14:03.186 TRACE 9476 --- [ntContainer#0-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [com.example.order.inbound.CustomerOrdersInboundListener.onMessage] after exception: java.lang.Exception
2022-03-16 13:14:03.186 DEBUG 9476 --- [ntContainer#0-1] o.s.j.connection.JmsTransactionManager   : Participating transaction failed - marking existing transaction as rollback-only
2022-03-16 13:14:03.186  WARN 9476 --- [ntContainer#0-1] o.s.j.l.DefaultMessageListenerContainer  : Execution of JMS message listener failed, and no ErrorHandler has been set.


2022-03-16 13:14:03.186 DEBUG 9476 --- [ntContainer#0-1] o.s.j.connection.JmsTransactionManager   : Transactional code has requested rollback
2022-03-16 13:14:03.186 DEBUG 9476 --- [ntContainer#0-1] o.s.j.connection.JmsTransactionManager   : Initiating transaction rollback
2022-03-16 13:14:03.186 DEBUG 9476 --- [ntContainer#0-1] o.s.j.connection.JmsTransactionManager   : Rolling back JMS transaction on Session


It is useful to setting Log Level Trace for Transaction Management to help you in understanding the workflow of how JmsTransactionManager and Springboot are dealing with Jms Transactions.

spring:
  application:
    name: customer-orders
  activemq:
    broker-url: vm://embedded?broker.persistent=true,useShutdownHook=false
    in-memory: true
    non-blocking-redelivery: true

customer-orders-queue-config:
    CustomerOrdersInboundQueue: EMBEDDED.AMQ.EXAMPLE.INBOUND.ROOT
    CustomerOrdersOutboundServiceAQueue: EMBEDDED.AMQ.EXAMPLE.OUTBOUND.A.ROOT
    CustomerOrdersOutboundServiceBQueue: EMBEDDED.AMQ.EXAMPLE.OUTBOUND.B.ROOT
    CustomerOrdersErrorQueue:  EMBEDDED.AMQ.EXAMPLE.ERROR.ROOT

logging:
  level:
    root: INFO
    '[com.example.order]': DEBUG
    '[org.springframework.transaction.interceptor]': TRACE
    '[org.springframework.jms.connection.JmsTransactionManager]': TRACE

The complete code for this is here.

Leave a comment