XA transactions using Spring

原文

JTA/XA transactions without the J2EE container.

Using J(2)EE application server has been a norm when high-end features like transactions, security, availability, and scalability are mandatory. There are very few options for java applications, which require only a subset of these enterprise features and, more often than not, organizations go for a full-blown J(2)EE server. This article focuses on distributed transactions using the JTA (Java Transaction API) and will elaborate on how distributed transactions (also called XA) can be used in a standalone java application, without a JEE server, using the widely popular Spring framework and the open source JTA implementations of JBossTS, Atomikos and Bitronix.
Distributed transaction processing systems are designed to facilitate transactions that span heterogeneous, transaction-aware resources in a distributed environment. Using distributed transactions, an application can accomplish tasks such as retrieving a message from a message queue and updating one or more databases in a single transactional unit adhering to the ACID (Atomicity, Consistency, Isolation and Durability) criteria. This article outlines some of the use cases where distributed transactions (XA) could be used and how an application can achieve transactional processing using JTA along with the best of the breed technologies. The main focus is on using Spring as a server framework and how one can integrate various JTA implementations seamlessly for enterprise level distributed transactions.

XA Transactions and the JTA API

Since the scope of the article is limited to using JTA implementations using the Spring framework, we will briefly touch upon the architectural concepts of distributed transaction processing.

XA Transactions

The X/Open Distributed Transaction Processing, designed by Open Group(a vendor consortium), defines a standard communication architecture that allows multiple applications to share resources provided by multiple resource managers, and allows their work to be coordinated into global transactions. The XA interfaces enable the resource managers to join transactions, to perform 2PC (two phase commit) and to recover in-doubt transactions following a failure.

*Figure 1: Conceptual model of the DTP environment.*

As shown in Figure 1, the model has the following interfaces:

  1. The interface between theapplication and the resource manager allows an application to call a resource manager directly, using the resource manager's native API or native XA API depending on if the transaction needs to be managed by the transaction monitor or not.
  2. The interface between the application and the transaction monitor (TX interface), lets the application call the transaction monitor for all transaction needs like starting a transaction, ending a transaction, rollback of a transaction etc.
  3. The interface between the transaction monitor and the resource manager is the XA interface. This is the interface, which facilitates the two-phase commit protocol to achieve distributed transactions under one global transaction.

JTA API

The JTA API, defined by Sun Microsystems, is a high-level API which defines interfaces between a transaction manager and the parties involved in a distributed transaction system. The JTA primarily consists of three parts:

  • A high-level application interface for an application to demarcate transaction boundaries. The UserTransaction interface encapsulates this.
  • A Java mapping of the industry standard X/Open XA protocol (Item #3 in the X/Open interfaces listed above). This encompasses the interfaces defined in thejavax.transaction.xa package, which consists of XAResource, Xid and XAException interfaces.
  • A high-level transaction manager interface that allows an application server to manage transactions for a user application. The TransactionManager, Transaction, Status and Synchronization interfaces pretty much define how the application server manages transactions.

Now that we have a brief summary of what JTA and XA standards are, let us go through some use cases to demonstrate the integration of different JTA implementations using Spring, for our hypothetical java application.

Our Use Cases

To demonstrate the integration of different JTA implementations with Spring, we are going to use the following use cases:

  1. Update two database resources in a global transaction*- We will use JBossTS as the JTA implementation. In the process, we will see how we can declaratively apply distributed transaction semantics to simple POJO's.
  2. Update a database and send a JMS message to a queue in a global transaction*- We will demonstrate integration with both Atomikos and Bitronix JTA implementations.
  3. Consume a JMS message and update a database in a global transaction- We will use both Atomikos and Bitronix JTA implementations. In the process, we will see how we can emulate transactional MDP's (Message Driven POJO's).

We will be using MySQL for the databases and Apache ActiveMQ as our JMS messaging provider for our use cases. Before going through the use cases, let us briefly look at the technology stack we are going to use.

Spring framework

Spring framework has established itself as one of the most useful and productive frameworks in the Java world. Among the many benefits it provides, it also provides the necessary plumbing for running an application with any JTA implementation. This makes it unique in the sense that an application doesn't need to run in a JEE container to get the benefits of JTA transactions. Please note that Spring doesn't provide any JTA implementation as such. The only task from the user perspective is to make sure that the JTA implementation is wired to use the Spring framework's JTA support. This is what we will be focusing on in the following sections.

Transactions in Spring

Spring provides both programmatic and declarative transaction management using a lightweight transaction framework. This makes it easy for standalone java applications to include transactions (JTA or non-JTA) either programmatically or declaratively. The programmatic transaction demarcation can be accomplished by using the API exposed by the PlatformTransactionManager interface and its sub-classes. On the other hand, the declarative transaction demarcation uses an AOP (Aspect Oriented Programming) based solution. For this article, we will explore the declarative transaction demarcation, since it is less intrusive and easy to understand, using the TransactionProxyFactoryBean class. The transaction management strategy, in our case, is to use the JtaTransactionManager, since we have multiple resources to deal with. If there is only a single resource, there are several choices depending on the underlying technology and all of them implement the PlatformTransactionManager interface. For example, for Hibernate, one can choose to use HibernateTransactionManager and for JDO based persistence, one can use the JdoTransactionManager. There is also a JmsTransactionManager, which is meant for local transactions only.

Spring's transaction framework also provides the necessary tools for applications to define the transaction propagation behavior, transaction isolation and so forth. For declarative transaction management, the TransactionDefinition interface specifies the propagation behavior, which is very much similar to EJB CMT attributes. The TransactionAttribute interface allows the application to specify which exceptions will cause a rollback and which ones will be committed. These are the two crucial interfaces, which make the declarative transaction management very easy to use and configure, and we will see as we go through our use cases.

Asynchronous Message Consumption using Spring

Spring has always supported sending messages using JMS API via its JMS abstraction layer. It employs a callback mechanism, which consists of a message creator and a JMS template that actually sends the message created by the message creator.
Since the release of Spring 2.0, asynchronous message consumption has been made possible using the JMS API. Though Spring provides different message listener containers, for consuming the messages, the one that is mostly suited to both JEE and J2SE environments is theDefaultMessageListenerContainer(DMLC). The DefaultMessageListenerContainer extends theAbstractPollingMessageListenerContainer class and provides full support for JMS 1.1 API. It primarily uses the JMS synchronous receive calls( MessageConsumer.receive() ) inside a loop and allows for transactional reception of messages. For J(2)SE environment, the stand-alone JTA implementations can be wired to use the Spring'sJtaTransactionManager , which will be demonstrated in the following sections.

The JTA implementations

JBossTS

JBossTS, formerly known as Arjuna Transaction Service, comes with a very robust implementation, which supports both JTA and JTS API. JBossTS comes with a recovery service, which could be run as a separate process from your application processes. Unfortunately, it doesn't support out-of-the box integration with Spring, but it is easy to integrate as we will see in our exercise. Also there is no support for JMS resources, only database resources are supported.

Atomikos Transaction Essentials

Atomikos's JTA implementation has been open sourced very recently. The documentation and literature on the internet shows that it is a production quality implementation, which also supports recovery and some exotic features beyond the JTA API. Atomikos provides out of the box Spring integration along with some nice examples. Atomikos supports both database and JMS resources. It also provides support for pooled connections for both database and JMS resources.

Bitronix JTA

Bitronix's JTA implementation is fairly new and is still in beta. It also claims to support transaction recovery as good as or even better than some of the commercial products. Bitronix provides support for both database and JMS resources. Bitronix also provides connection pooling and session pooling out of the box.

XA Resources

JMS Resources

The JMS API specification does not require that a provider supports distributed transactions, but if the provider does, it should be done via the JTA XAResource API. So the provider should expose its JTA support using the XAConnectionFactory, XAConnection and XASession interfaces. Fortunately Apache's ActiveMQ provides the necessary implementation for handling XA transactions. Our project (see Resources section) also includes configuration files for using TIBCO EMS (JMS server from TIBCO) and one can notice that the configuration files require minimal changes when the providers are switched.

Database Resources

MySQL database provides an XA implementation and works only for their InnoDB engines. It also provides a decent JDBC driver, which supports the JTA/XA protocol. Though there are some restrictions placed on the usage of some XA features, for the purposes of the article, it is good enough.

The Environment

Setup for Databases:

The first database mydb1 will be used for use cases 1 and 2 and will have the following table:

mysql> use mydb1;
Database changed
mysql> select * from msgseq;
+---------+-----------+-------+
| APPNAME | APPKEY    | VALUE |
+---------+-----------+-------+
| spring  | execution |    13 |
+---------+-----------+-------+
1 row in set (0.00 sec)

The second database mydb2 will be used for use case 3 and will have the following table:

mysql> use mydb2;
Database changed
mysql> select * from msgseq;
+---------+------------+-------+
| APPNAME | APPKEY     | VALUE |
+---------+------------+-------+
| spring  | aaaaa      | 15    |
| spring  | allocation | 13    |
+---------+------------+-------+
2 rows in set (0.00 sec)

Setup for JMS provider (for use case 2 and 3)

For creating a physical destination in ActiveMQ, do the following:

  1. Add the following destination to the activmq.xmlfile under the conf folder of ActiveMQ installation:
<destinations>
    <queue physicalName="test.q1" />
</destinations>
  1. Add the following line of code in the jndi.properties file to include the jndi name for the destination and make sure the file is in the classpath: queue.test.q1=test.q1
Figure 2: UseCase1 updates two databases in a global transaction.

Let us assume that our application has a requirement where it needs to persist a sequence number, associated with an event, in two different databases(mydb1 and mydb2), within the same transactional unit of work as shown in Figure 2 above. To achieve this, let us write a simple method in our POJO class, which updates the two databases. The code for our EventHandler
POJO class looks as follows:

public void handleEvent(boolean fail) throws Exception {
     MessageSequenceDAO dao = (MessageSequenceDAO) springContext.getBean("sequenceDAO");
     int value = 13;
     String app = "spring";
     String appKey = "execution";
     int upCnt = dao.updateSequence(value, app, appKey);
     log.debug(" sql updCnt->" + upCnt);
     
     if (springContext.containsBean("sequenceDAO2")) {
      // this is for use case 1 with JBossTS
      MessageSequenceDAO dao2 = (MessageSequenceDAO) springContext.getBean("sequenceDAO2");
      appKey = "allocation";
      upCnt = dao2.updateSequence(value, app, appKey);
      log.debug(" sql updCnt2->" + upCnt);
     }

        ...
     
     if (fail) {
      throw new RuntimeException("Simulating Rollback by throwing Exception !!");
     }
    }

As you can figure out, all we are doing in the first segment of the code is getting a reference to the MessageSequenceDAO object representing the first database and updating the value of the sequence.
As you can guess, the next piece of code updates the sequence in the second database. The last if statement throws a run-time exception when we run the code with the boolean value set to "true". This is for simulating a run-time exception to test if the transaction manager has successfully rolled back the global transaction.
Let us look at the spring configuration file to see how we configured our DAO classes and the datasources:

<bean id="dsProps" class="java.util.Properties">
     <constructor-arg>
       <props>
        <prop key="user">root</prop>
        <prop key="password">murali</prop>
        <prop key="DYNAMIC_CLASS">com.findonnet.service.transaction.jboss.jdbc.Mysql</prop>
       </props>
     </constructor-arg>
 </bean>
 
 <bean id="dataSource1" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
  <property name="driverClassName">
   <value>com.arjuna.ats.jdbc.TransactionalDriver</value>
  </property>
  <property name="url" value="jdbc:arjuna:mysql://127.0.0.1:3306/mydb1"/>
  <property name="connectionProperties">
    <ref bean="dsProps"/>
  </property>
 </bean>
 
 <bean id="dataSource2" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
  <property name="driverClassName">
   <value>com.arjuna.ats.jdbc.TransactionalDriver</value>
  </property>
  <property name="url" value="jdbc:arjuna:mysql://127.0.0.1:3306/mydb2"/>
  <property name="connectionProperties">
    <ref bean="dsProps"/>
  </property>
 </bean>
 
 <bean id="sequenceDAO" class="com.findonnet.persistence.MessageSequenceDAO">
  <property name="dataSource">
   <ref bean="dataSource1"/>
  </property>
 </bean>
 
 <bean id="sequenceDAO2" class="com.findonnet.persistence.MessageSequenceDAO">
  <property name="dataSource">
   <ref bean="dataSource2"/>
  </property>
 </bean>

The above beans define the two datasources for the two databases. To use JBossTS's TransactionalDriver, we need to register the database with either the JNDI bindings or with Dynamic class instantiations. We will be using the later, which requires us to implement the DynamicClass
interface. The bean definition for dsProps shows that we are going to use the com.findonnet.service.transaction.jboss.jdbc.Mysql
class, that we wrote, which implements the DynamicClass
interface. Since JBossTS doesn't provide any out of the box wrappers for the MySQL database, we had to do this. In addition to this, the code relies on the jdbc URL starting with "jdbc:arjuna", otherwise the JBossTS code throws errors.

The implementation of the DynamicClass interface is very simple, all we have to do is to implement the methods getDataSource() and the shutdownDataSource() methods. The getDataSource() method returns an appropriate XADataSource object, which, in our case is the com.mysql.jdbc.jdbc2.optional.MysqlXADataSource.

Please note that both the datasources are configured to use the DriverManagerDataSource, from Spring, which doesn't use any connection pooling and is not recommended for production use. The alternative is to use a pooled datasource, which can handle XA datasource pooling.

It's now time for us to look at providing transactional semantics to our handleEvent() method in our EventHandler class:

<bean id="eventHandlerTarget" class="com.findonnet.messaging.EventHandler"></bean>
 <bean id="eventHandler" class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
   <property name="transactionManager"><ref bean="transactionManager" /></property>
   <property name="target"><ref bean="eventHandlerTarget"/></property>
   <property name="transactionAttributes">
     <props>
      <prop key="handle*">PROPAGATION_REQUIRED,-Exception</prop>
     </props>
   </property>
 </bean>

Here we define the TransactionProxyFactoryBean
and pass in the transactionManager reference, which we will see later, with the transaction attributes "PROPAGATION_REQUIRED,-Exception". This will be applied to the target bean eventHandlerTarget and only on methods, which start with "handle" (notice the handle*). To put it in simple words, what we are asking Spring framework to do is; for all method invocations on the target object, whose method names start with"handle", please apply the transaction attributes "PROPAGATION_REQUIRED,-Exception". Behind the scenes, the Spring framework will create a CGLIB based proxy, which intercepts all the calls on the EventHandler for the method names, that start with "handle". In case of any Exception, within the method call, the current transaction will be rolled back and that is what the "-Exception" means. This demonstrates how easy it is providing transactional support, declaratively, using Spring.
Now let us look at how we can wire up the Spring's JtaTransactionManager to use our choice of JTA implementation. The eventHandler bean defined above uses the transactionManager attribute, which will refer to the Spring JtaTransactionManager as shown below:

<bean id="jbossTransactionManager"
  class="com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple">
 </bean>
 
 <bean id="jbossUserTransaction"
  class="com.arjuna.ats.internal.jta.transaction.arjunacore.UserTransactionImple"/>
 
 <bean id="transactionManager"  class="org.springframework.transaction.jta.JtaTransactionManager">
  <property name="transactionManager">
   <ref bean="jbossTransactionManager" />
  </property>
  <property name="userTransaction">
   <ref bean="jbossUserTransaction" />
  </property>
 </bean>

As shown above in the configuration, we are wiring up the spring provided JtaTransactionManager class to use com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple as the TransactionManager implementation and com.arjuna.ats.internal.jta.transaction.arjunacore.UserTransactionImple
as the UserTransaction implementation.
That's all there is to it. We have just enabled XA transactions using Spring and JbossTS with MySQL datasources acting as XA resources. Please note that at the time of writing there are some known issues with MySQL XA implementation (See the resources section).
To run this use case please look at the JbossSender task in the ant build file. The project download is provided in the Resources section.

Use Case2 - Update database and send JMS message in a global transaction

*Figure 3: UseCase2 updates a database and sends a JMS message in a global transaction.*

This use case uses the same code we used for use case 1. The code updates the database mydb1 and then sends a message to a message queue in a global transaction as shown in Figure 3 above. Both Atomikosand Bitronix configurations will be dealt with, in that order.
The POJO is same as the one we used for usecase1, the EventHandler
class, and the relevant code looks as follows:

 public void handleEvent(boolean fail) throws Exception {
 
     MessageSequenceDAO dao = (MessageSequenceDAO) springContext.getBean("sequenceDAO");
     int value = 13;
     String app = "spring";
     String appKey = "execution";
     int upCnt = dao.updateSequence(value, app, appKey);
     log.debug(" sql updCnt->" + upCnt);
     
        ...
     
     if (springContext.containsBean("appSenderTemplate")) {
      this.setJmsTemplate((JmsTemplate) springContext.getBean("appSenderTemplate"));
      this.getJmsTemplate().convertAndSend("Testing 123456");
      log.debug("Sent message succesfully");
     }
     if (fail) {
      throw new RuntimeException("Simulating Rollback by throwing Exception !!");
     }
  }

The first code snippet in the above mentioned code updates the msgseq table in the database mydb1 with the sequence number. The next piece of code uses the appSenderTemplate, which is configured to use the JmsTemplate class from Spring. This template is used to send JMS messages to the message provider. We will see how this is defined in the configuration file in the following segment.
An external event, in this case the MainApp class, will invoke the method handleEvent shown in Figure 3 above.
The last if statement is for simulating a run-time exception to test if the transaction manager has successfully rolled back the global transaction.
Let us look at the JNDI bean definitions for our messaging needs in the spring configuration file:

 <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
  <property name="environment">
   <props>
    <prop key="java.naming.factory.initial">
     org.apache.activemq.jndi.ActiveMQInitialContextFactory
    </prop>
   </props>
  </property>
 </bean>
 
 <bean id="appJmsDestination"
  class="org.springframework.jndi.JndiObjectFactoryBean">
  <property name="jndiTemplate">
   <ref bean="jndiTemplate"/>
  </property>
  <property name="jndiName" value="test.q1"/>
 </bean>
 
  <bean id="appSenderTemplate" 
    class="org.springframework.jms.core.JmsTemplate">
  <property name="connectionFactory">
    <ref bean="queueConnectionFactoryBean"/>
  </property>
  <property name="defaultDestination">
    <ref bean="appJmsDestination"/>
  </property>
  <property name="messageTimestampEnabled" value="false"/>
  <property name="messageIdEnabled" value="false"/>
  <!-- sessionTransacted should be true only for Atomikos -->
  <property name="sessionTransacted" value="true"/>
 </bean>

Here we are specifying a JndiTemplate for Spring to do a JNDI lookup on the destination specified by the appJmsDestination bean. The appJmsDestination
bean has been wired with the appSenderTemplate (JmsTemplate) bean as shown above. The bean definitions also show that appSenderTemplate
is wired to use the queueConnectionFactoryBean, which we will see later. For Atomikos, the sessionTransacted property should be set to "true", which is not advised by the Spring framework and the literature on JMS seem to support that viewpoint. This is a hack we need to do only for Atomikos implementation. If this is set to "false", you will notice some Heuristic Exceptions thrown during the 2PC protocol. This is mainly attributed to the prepare() call not responding on the JMS resource, and eventually, Atomikos decides to rollback resulting in a Heuristic Exception. The messageTimestampEnabled and the messageIdEnabled attributes are set to "false" so that these are not generated since we are not going to use them anyway. This will reduce the overhead on the JMS provider and improves performance.
Let us look at the spring configuration beans for Atomikos:

 <bean id="xaFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
        <constructor-arg>
           <value>tcp://localhost:61616</value>
        </constructor-arg>
 </bean>
 
 <bean id="queueConnectionFactoryBean"
    class="com.atomikos.jms.QueueConnectionFactoryBean" init-method="init">
    <property name="resourceName">
     <value>Execution_Q</value>
    </property>
    <property name="xaQueueConnectionFactory">
     <ref bean="xaFactory" />
    </property>
 </bean>
 
 <bean id="atomikosTransactionManager"
  class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
  <property name="forceShutdown"><value>true</value></property>
 </bean>
 
 <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"/>
 
 <bean id="transactionManager"
  class="org.springframework.transaction.jta.JtaTransactionManager">
  <property name="transactionManager">
   <ref bean="atomikosTransactionManager" />
  </property>
  <property name="userTransaction">
   <ref bean="atomikosUserTransaction" />
  </property>
 </bean>

The xaFactory definition shows that the underlying xa connection factory being used is the org.apache.activemq.ActiveMQXAConnectionFactory class. ThetransactionManager bean is wired up to use theatomikosTransactionManager bean and theatomikosUserTransaction bean.
Let us now look at the datasource definition for Atomikos:

 <bean id="dataSource" class="com.atomikos.jdbc.SimpleDataSourceBean" init-method="init"  destroy-method="close">
  <property name="uniqueResourceName"><value>Mysql</value></property>
  <property name="xaDataSourceClassName">
   <value>com.mysql.jdbc.jdbc2.optional.MysqlXADataSource</value>
  </property>
  <property name="xaDataSourceProperties">
   <value>URL=jdbc:mysql://127.0.0.1:3306/mydb1?user=root&password=murali</value>
  </property>
  <property name="exclusiveConnectionMode"><value>true</value></property>
 </bean>

Atomikos provides a generic wrapper class, which makes it easy to pass in the xaDataSourceClassName, which, in our case, is com.mysql.jdbc.jdbc2.optional.MysqlXADataSource. Same is the case for the JDBC url. The exclusiveConnectionMode is set to "true" to make sure that the connection in the current transaction is not shared. Atomikos provides connection pooling out of the box, and one can set the pool size using theconnectionPoolSize attribute.
Let us now look at the relevant bean definitions for Bitronix:

 <bean id="ConnectionFactory" factory-bean="ConnectionFactoryBean" factory-method="createResource" />
 <bean id="dataSourceBean1" class="bitronix.tm.resource.jdbc.DataSourceBean">
  <property name="className" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
  <property name="uniqueName" value="mysql" />
  <property name="poolSize" value="2" />
  <property name="driverProperties">
   <props>
    <prop key="user">root</prop>
    <prop key="password">murali</prop>
    <prop key="databaseName">mydb1</prop>
   </props>
  </property>
 </bean>
 <bean id="Db1DataSource" factory-bean="dataSourceBean1" factory-method="createResource" />
 <bean id="BitronixTransactionManager" factory-method="getTransactionManager"
     class="bitronix.tm.TransactionManagerServices" depends-on="btmConfig,ConnectionFactory" destroy-method="shutdown" />
 <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
  <property name="transactionManager" ref="BitronixTransactionManager" />
  <property name="userTransaction" ref="BitronixTransactionManager" />
 </bean>
 
 <bean id="btmConfig" factory-method="getConfiguration" class="bitronix.tm.TransactionManagerServices">
     <property name="serverId" value="spring-btm-sender" />
 </bean>

The appSenderTemplate bean defined for Atomikos can be re-used, with the only exception of the sessionTransacted value. This is set to "false", the default for the JmsTemplate in Spring anyway, so one can even ignore this attribute.

The datasource bean definition, shown above, looks similar to Atomikos, but the main difference is that the bean creation is done using an instance factory method rather than the static factory method. In this case, theDb1DataSource bean is created using the factory-beandataSourceBean1. The factory method specified wascreateResource.
The transactionManager bean refers to BitonixTransactionManager for both the transactionManagerattribute and the userTransaction attribute. To run this use case, please look at the AtomikosSender task and the BitronixSender in the ant build file, provided as part of the project download.
The sequence diagram for this use case, which is by no means a comprehensive one, is shown below:

*Figure 5: Sequence diagram, which illustrates the process flow for use case 2.*

UseCase3 - (Transactional MDP's) Consume message and update database in a global transaction

*Figure 4: UseCase3 consumes a JMS message and updates a database in a global transaction.*

This use case is different from the previous use cases and all it does is define a POJO to handle the messages received from a messaging provider. It also updates the database within the same transaction as shown in Figure 4 above.
The relevant code for our MessageHandler class looks as follows:

 public void handleOrder(String msg) {
  log.debug("Receieved message->: " + msg);
  MessageSequenceDAO dao = (MessageSequenceDAO) MainApp.springCtx.getBean("sequenceDAO");
  String app = "spring";
  String appKey = "allocation";
  int upCnt = dao.updateSequence(value++, app, appKey);
  log.debug("Update SUCCESS!! Val: " + value + " updateCnt->"+ upCnt);
  if (fail)
   throw new RuntimeException("Rollback TESTING!!");
 }

As you can see, the code just updates the database mydb2. The MessageHandler is just a POJO, which has a handleOrder() method. Using Spring we are going to transform this into a message driven POJO (analogous to MDB in a JEE server). To accomplish this we will use the MessageListenerAdapter
class, which delegates the message handling to the target listening methods via reflection. This is a very convenient feature, which enables simple POJO's to be converted to message driven POJO's (beans). Our MDP now supports distributed transactions.
It's time for us to look at the configuration for more clarity:

<bean id="msgHandler" class="com.findonnet.messaging.MessageHandler"/>
    <bean id="msgListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="msgHandler"/>
        <property name="defaultListenerMethod" value="handleOrder"/>
    </bean>

The above configuration shows that the msgListener bean delegates the calls to the bean defined by the msgHandler. Also, we have specified the handleOrder(), which should be invoked when the message arrives from the message provider. This was done using the defaultListenerMethodattribute. Let us now look at the message listener, which listens to the destination on the message provider:

  <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" 
  destroy-method="close">
    <property name="concurrentConsumers" value="1"/>
    <property name="connectionFactory" ref="queueConnectionFactoryBean" />
    <property name="destination" ref="appJmsDestination"/>
    <property name="messageListener" ref="msgListener"/>
    <property name="transactionManager" ref="transactionManager"/>
    <property name="sessionTransacted" value="false"/>
    <property name="receiveTimeout" value="5000"/>
    <property name="recoveryInterval" value="6000"/>
    <property name="autoStartup" value="false"/>
  </bean>

The listenerContainer in this case uses the DefaultMessageListenerContainer
provided by Spring. The concurrentConsumers attribute is set to "1", which indicates that we will only have one consumer on the queue. This attribute is mainly used for draining the queues concurrently by spawning multiple listeners (threads), and is very useful in situations where you have a fast producer and slow consumer and the ordering of the messages is not important. With Spring 2.0.3 and above there is support for dynamically adjusting the number of listeners based on load using themaxConcurrentConsumers attribute. The recoveryInterval attribute is used for recovery purposes and is useful when a messaging provider is down and we want to re-connect without bringing the application down. This feature, however, runs in an infinite loop and keeps re-trying for as long as the application is running, which you may not want. Also, one has to be careful in properly disposing the DMLC, since there are background threads, which might still be trying to receive messages from the message provider even after the JVM is shutdown. As of Spring 2.0.4, this issue has been fixed. As mentioned before, the sessionTransacted attribute should be set to "true" for Atomikos only. The same bean definition for listenerContainer applies to both Atomikos and Bitronix.
Please note that the transactionManger attribute points to the bean definitions that were defined above (for usecase2) and we are just re-using the same bean definitions.
That's all there is to it, we just implemented our MDP which receives the message and updates the database in a single global transaction.
To run this use case, please look at the AtomikosConsumer task and theBitronixConsumer in the ant build file, provided as part of the project download.

Some AspectJ

To intercept the calls, between the Spring framework and the JTA code, an Interceptor class has been used and weaved into the runtime libraries of the JTA implementation jar files. It uses AspectJ and the code snippet is shown below:

 pointcut xaCalls() : call(*  XAResource.*(..)) 
                   || call(*  TransactionManager.*(..))
                   || call(*  UserTransaction.*(..))
                   ;
 
  Object around() : xaCalls() {
   log.debug("XA CALL -> This: " + thisJoinPoint.getThis());
   log.debug("       -> Target: " + thisJoinPoint.getTarget());
   log.debug("       -> Signature: " + thisJoinPoint.getSignature());
   Object[] args = thisJoinPoint.getArgs();
   StringBuffer str = new StringBuffer(" ");
   for(int i=0; i< args.length; i++) {
    str.append(" [" + i + "] = " + args[i]);
   }
   log.debug(str);
   Object obj = proceed();
   log.debug("XA CALL RETURNS-> " + obj);
   return obj;
  }

The above code defines a pointcut on all calls made to any JTA related code and it also defines an around advice, which logs the arguments being passed and the method return values. This will come in handy when we are trying to trace and debug issues with JTA implementations. The antbuild.xml file in the project (see Resources) contains tasks to weave the aspect against the JTA implementations. Another option is to use the MaintainJ plugin for eclipse, which provides the same from the comfort of an IDE (Eclipse) and even generates the sequence diagram for the process flow.

Some Gotchas

Distributed transactions is a very complex topic and one should look out for implementations where transaction recovery is robust enough and provides all the ACID (Atomicity, Consistency, Isolation and Durability) criteria that the user or application expects. What we tested, in this article, was for pre-2PC exceptions (remember the RuntimeException we were throwing to test rollbacks? ). Applications should thoroughly test JTA implementations for failures during the 2 phase commit process as they are the most crucial and troublesome.
All the JTA implementations we looked at provide recovery test cases, which make it easy to run against the implementation itself, and on the participating XA resources as well. Please note that using XA may turn out to be a huge performance concern especially when the transaction volumes are large. One should also look at support for 2PC optimizations like the "last resource commit", which might fit some application needs where only one of the participating resource cannot or need not support 2PC. Care should be taken about the XA features supported and restrictions imposed, if any, by the vendors of the database or the message provider. For example, MySQL doesn't support suspend() and resume() operations and also seems to have some restrictions on using XA and in some situations might even keep the data in an in-consistent state. To learn more about XA, Principles of Transaction Processing is a very good book, which covers the 2PC failure conditions and optimizations in great detail. Also, Mike Spille's blog (see Resources section) is another good resource, which focuses on XA within the JTA context and provides wealth of information, especially on failures during 2PC and helps understand more about XA transactions.
When using Spring framework for sending and receiving JMS messages, one should be wary of using the JmsTemplate and the DefualtMessageListenerContainer when running in a non-J(2)EE environment. In case of JmsTemplate, for every message that is sent there will be a new JMS connection created. Same is the case when using the DefaultMessageListenerContainer when receiving messages. Creating a heavy weight JMS connection is very resource-intensive and the application may not scale well under heavy loads. One option is to look for some sort of connection/session pooling support either from the JMS providers or third-party libraries. Another option is to use the SingleConnnectionFactory from Spring, which makes it easy to configure a single connection, which can be re-used. The sessions are, however created for every message being sent, and this may not be a real overhead since JMS sessions are lightweight. Same is the case when messages are being received irrespective of if they are transactional or not.

Conclusion

In this article we saw how Spring framework can be integrated with JTA implementations to provide distributed transactions and how it could cater to the needs of an application which required distributed transactions without the need for a full-blown JEE server. The article also show-cased how Spring framework provided POJO based solutions and declarative transaction management with minimal intrusion while promoting best design practices. The use cases we saw, also demonstrated how Spring provides us with a rich transaction management abstraction, enabling us to easily switch between different JTA providers seamlessly.

Author bio

Murali Kosaraju works as a technical architect at Wachovia Bank in Charlotte, North Carolina. He holds a masters degree in Systems and Information engineering and his interests include messaging, service oriented architectures (SOA), Web Services, JEE and .NET centric applications. He currently lives with his wife Vidya and son Vishal in South Carolina.

Learn more about this topic

Download the source code (8M Zip file).
X/Open XA
JTA API
Spring Framework
ActiveMQ
Mike Spille's Blog
MySQL XA issues
JTA Implementations:

  1. JBossTS
  2. Atomikos
  3. Bitronix
    Tibco Software
    Principles of Transaction Processing
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容