JTA/XA transactions without the J2EE container.
Using J(2)EE application server has been a norm when high-end features like transactions, security, availability, and scalability are mandatory. There are very few options for java applications, which require only a subset of these enterprise features and, more often than not, organizations go for a full-blown J(2)EE server. This article focuses on distributed transactions using the JTA (Java Transaction API) and will elaborate on how distributed transactions (also called XA) can be used in a standalone java application, without a JEE server, using the widely popular Spring framework and the open source JTA implementations of JBossTS, Atomikos and Bitronix.
Distributed transaction processing systems are designed to facilitate transactions that span heterogeneous, transaction-aware resources in a distributed environment. Using distributed transactions, an application can accomplish tasks such as retrieving a message from a message queue and updating one or more databases in a single transactional unit adhering to the ACID (Atomicity, Consistency, Isolation and Durability) criteria. This article outlines some of the use cases where distributed transactions (XA) could be used and how an application can achieve transactional processing using JTA along with the best of the breed technologies. The main focus is on using Spring as a server framework and how one can integrate various JTA implementations seamlessly for enterprise level distributed transactions.
XA Transactions and the JTA API
Since the scope of the article is limited to using JTA implementations using the Spring framework, we will briefly touch upon the architectural concepts of distributed transaction processing.
XA Transactions
The X/Open Distributed Transaction Processing, designed by Open Group(a vendor consortium), defines a standard communication architecture that allows multiple applications to share resources provided by multiple resource managers, and allows their work to be coordinated into global transactions. The XA interfaces enable the resource managers to join transactions, to perform 2PC (two phase commit) and to recover in-doubt transactions following a failure.
As shown in Figure 1, the model has the following interfaces:
- The interface between theapplication and the resource manager allows an application to call a resource manager directly, using the resource manager's native API or native XA API depending on if the transaction needs to be managed by the transaction monitor or not.
- The interface between the application and the transaction monitor (TX interface), lets the application call the transaction monitor for all transaction needs like starting a transaction, ending a transaction, rollback of a transaction etc.
- The interface between the transaction monitor and the resource manager is the XA interface. This is the interface, which facilitates the two-phase commit protocol to achieve distributed transactions under one global transaction.
JTA API
The JTA API, defined by Sun Microsystems, is a high-level API which defines interfaces between a transaction manager and the parties involved in a distributed transaction system. The JTA primarily consists of three parts:
- A high-level application interface for an application to demarcate transaction boundaries. The UserTransaction interface encapsulates this.
- A Java mapping of the industry standard X/Open XA protocol (Item #3 in the X/Open interfaces listed above). This encompasses the interfaces defined in thejavax.transaction.xa package, which consists of XAResource, Xid and XAException interfaces.
- A high-level transaction manager interface that allows an application server to manage transactions for a user application. The TransactionManager, Transaction, Status and Synchronization interfaces pretty much define how the application server manages transactions.
Now that we have a brief summary of what JTA and XA standards are, let us go through some use cases to demonstrate the integration of different JTA implementations using Spring, for our hypothetical java application.
Our Use Cases
To demonstrate the integration of different JTA implementations with Spring, we are going to use the following use cases:
- Update two database resources in a global transaction*- We will use JBossTS as the JTA implementation. In the process, we will see how we can declaratively apply distributed transaction semantics to simple POJO's.
- Update a database and send a JMS message to a queue in a global transaction*- We will demonstrate integration with both Atomikos and Bitronix JTA implementations.
- Consume a JMS message and update a database in a global transaction- We will use both Atomikos and Bitronix JTA implementations. In the process, we will see how we can emulate transactional MDP's (Message Driven POJO's).
We will be using MySQL for the databases and Apache ActiveMQ as our JMS messaging provider for our use cases. Before going through the use cases, let us briefly look at the technology stack we are going to use.
Spring framework
Spring framework has established itself as one of the most useful and productive frameworks in the Java world. Among the many benefits it provides, it also provides the necessary plumbing for running an application with any JTA implementation. This makes it unique in the sense that an application doesn't need to run in a JEE container to get the benefits of JTA transactions. Please note that Spring doesn't provide any JTA implementation as such. The only task from the user perspective is to make sure that the JTA implementation is wired to use the Spring framework's JTA support. This is what we will be focusing on in the following sections.
Transactions in Spring
Spring provides both programmatic and declarative transaction management using a lightweight transaction framework. This makes it easy for standalone java applications to include transactions (JTA or non-JTA) either programmatically or declaratively. The programmatic transaction demarcation can be accomplished by using the API exposed by the PlatformTransactionManager
interface and its sub-classes. On the other hand, the declarative transaction demarcation uses an AOP (Aspect Oriented Programming) based solution. For this article, we will explore the declarative transaction demarcation, since it is less intrusive and easy to understand, using the TransactionProxyFactoryBean
class. The transaction management strategy, in our case, is to use the JtaTransactionManager
, since we have multiple resources to deal with. If there is only a single resource, there are several choices depending on the underlying technology and all of them implement the PlatformTransactionManager
interface. For example, for Hibernate, one can choose to use HibernateTransactionManager
and for JDO based persistence, one can use the JdoTransactionManager
. There is also a JmsTransactionManager
, which is meant for local transactions only.
Spring's transaction framework also provides the necessary tools for applications to define the transaction propagation behavior, transaction isolation and so forth. For declarative transaction management, the TransactionDefinition
interface specifies the propagation behavior, which is very much similar to EJB CMT attributes. The TransactionAttribute
interface allows the application to specify which exceptions will cause a rollback and which ones will be committed. These are the two crucial interfaces, which make the declarative transaction management very easy to use and configure, and we will see as we go through our use cases.
Asynchronous Message Consumption using Spring
Spring has always supported sending messages using JMS API via its JMS abstraction layer. It employs a callback mechanism, which consists of a message creator and a JMS template that actually sends the message created by the message creator.
Since the release of Spring 2.0, asynchronous message consumption has been made possible using the JMS API. Though Spring provides different message listener containers, for consuming the messages, the one that is mostly suited to both JEE and J2SE environments is theDefaultMessageListenerContainer(DMLC). The DefaultMessageListenerContainer extends theAbstractPollingMessageListenerContainer class and provides full support for JMS 1.1 API. It primarily uses the JMS synchronous receive calls( MessageConsumer.receive() ) inside a loop and allows for transactional reception of messages. For J(2)SE environment, the stand-alone JTA implementations can be wired to use the Spring'sJtaTransactionManager , which will be demonstrated in the following sections.
The JTA implementations
JBossTS
JBossTS, formerly known as Arjuna Transaction Service, comes with a very robust implementation, which supports both JTA and JTS API. JBossTS comes with a recovery service, which could be run as a separate process from your application processes. Unfortunately, it doesn't support out-of-the box integration with Spring, but it is easy to integrate as we will see in our exercise. Also there is no support for JMS resources, only database resources are supported.
Atomikos Transaction Essentials
Atomikos's JTA implementation has been open sourced very recently. The documentation and literature on the internet shows that it is a production quality implementation, which also supports recovery and some exotic features beyond the JTA API. Atomikos provides out of the box Spring integration along with some nice examples. Atomikos supports both database and JMS resources. It also provides support for pooled connections for both database and JMS resources.
Bitronix JTA
Bitronix's JTA implementation is fairly new and is still in beta. It also claims to support transaction recovery as good as or even better than some of the commercial products. Bitronix provides support for both database and JMS resources. Bitronix also provides connection pooling and session pooling out of the box.
XA Resources
JMS Resources
The JMS API specification does not require that a provider supports distributed transactions, but if the provider does, it should be done via the JTA XAResource API. So the provider should expose its JTA support using the XAConnectionFactory, XAConnection and XASession interfaces. Fortunately Apache's ActiveMQ provides the necessary implementation for handling XA transactions. Our project (see Resources section) also includes configuration files for using TIBCO EMS (JMS server from TIBCO) and one can notice that the configuration files require minimal changes when the providers are switched.
Database Resources
MySQL database provides an XA implementation and works only for their InnoDB engines. It also provides a decent JDBC driver, which supports the JTA/XA protocol. Though there are some restrictions placed on the usage of some XA features, for the purposes of the article, it is good enough.
The Environment
Setup for Databases:
The first database mydb1 will be used for use cases 1 and 2 and will have the following table:
mysql> use mydb1;
Database changed
mysql> select * from msgseq;
+---------+-----------+-------+
| APPNAME | APPKEY | VALUE |
+---------+-----------+-------+
| spring | execution | 13 |
+---------+-----------+-------+
1 row in set (0.00 sec)
The second database mydb2 will be used for use case 3 and will have the following table:
mysql> use mydb2;
Database changed
mysql> select * from msgseq;
+---------+------------+-------+
| APPNAME | APPKEY | VALUE |
+---------+------------+-------+
| spring | aaaaa | 15 |
| spring | allocation | 13 |
+---------+------------+-------+
2 rows in set (0.00 sec)
Setup for JMS provider (for use case 2 and 3)
For creating a physical destination in ActiveMQ, do the following:
- Add the following destination to the activmq.xmlfile under the conf folder of ActiveMQ installation:
<destinations>
<queue physicalName="test.q1" />
</destinations>
- Add the following line of code in the jndi.properties file to include the jndi name for the destination and make sure the file is in the
classpath: queue.test.q1=test.q1
Let us assume that our application has a requirement where it needs to persist a sequence number, associated with an event, in two different databases(mydb1 and mydb2), within the same transactional unit of work as shown in Figure 2 above. To achieve this, let us write a simple method in our POJO class, which updates the two databases. The code for our EventHandler
POJO class looks as follows:
public void handleEvent(boolean fail) throws Exception {
MessageSequenceDAO dao = (MessageSequenceDAO) springContext.getBean("sequenceDAO");
int value = 13;
String app = "spring";
String appKey = "execution";
int upCnt = dao.updateSequence(value, app, appKey);
log.debug(" sql updCnt->" + upCnt);
if (springContext.containsBean("sequenceDAO2")) {
// this is for use case 1 with JBossTS
MessageSequenceDAO dao2 = (MessageSequenceDAO) springContext.getBean("sequenceDAO2");
appKey = "allocation";
upCnt = dao2.updateSequence(value, app, appKey);
log.debug(" sql updCnt2->" + upCnt);
}
...
if (fail) {
throw new RuntimeException("Simulating Rollback by throwing Exception !!");
}
}
As you can figure out, all we are doing in the first segment of the code is getting a reference to the MessageSequenceDAO
object representing the first database and updating the value of the sequence.
As you can guess, the next piece of code updates the sequence in the second database. The last if
statement throws a run-time exception when we run the code with the boolean value set to "true". This is for simulating a run-time exception to test if the transaction manager has successfully rolled back the global transaction.
Let us look at the spring configuration file to see how we configured our DAO classes and the datasources:
<bean id="dsProps" class="java.util.Properties">
<constructor-arg>
<props>
<prop key="user">root</prop>
<prop key="password">murali</prop>
<prop key="DYNAMIC_CLASS">com.findonnet.service.transaction.jboss.jdbc.Mysql</prop>
</props>
</constructor-arg>
</bean>
<bean id="dataSource1" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName">
<value>com.arjuna.ats.jdbc.TransactionalDriver</value>
</property>
<property name="url" value="jdbc:arjuna:mysql://127.0.0.1:3306/mydb1"/>
<property name="connectionProperties">
<ref bean="dsProps"/>
</property>
</bean>
<bean id="dataSource2" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName">
<value>com.arjuna.ats.jdbc.TransactionalDriver</value>
</property>
<property name="url" value="jdbc:arjuna:mysql://127.0.0.1:3306/mydb2"/>
<property name="connectionProperties">
<ref bean="dsProps"/>
</property>
</bean>
<bean id="sequenceDAO" class="com.findonnet.persistence.MessageSequenceDAO">
<property name="dataSource">
<ref bean="dataSource1"/>
</property>
</bean>
<bean id="sequenceDAO2" class="com.findonnet.persistence.MessageSequenceDAO">
<property name="dataSource">
<ref bean="dataSource2"/>
</property>
</bean>
The above beans define the two datasources for the two databases. To use JBossTS's TransactionalDriver, we need to register the database with either the JNDI bindings or with Dynamic
class instantiations. We will be using the later, which requires us to implement the DynamicClass
interface. The bean definition for dsProps
shows that we are going to use the com.findonnet.service.transaction.jboss.jdbc.Mysql
class, that we wrote, which implements the DynamicClass
interface. Since JBossTS doesn't provide any out of the box wrappers for the MySQL database, we had to do this. In addition to this, the code relies on the jdbc URL starting with "jdbc:arjuna"
, otherwise the JBossTS code throws errors.
The implementation of the DynamicClass
interface is very simple, all we have to do is to implement the methods getDataSource()
and the shutdownDataSource()
methods. The getDataSource() method returns an appropriate XADataSource
object, which, in our case is the com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
.
Please note that both the datasources are configured to use the DriverManagerDataSource
, from Spring, which doesn't use any connection pooling and is not recommended for production use. The alternative is to use a pooled datasource, which can handle XA datasource pooling.
It's now time for us to look at providing transactional semantics to our handleEvent()
method in our EventHandler
class:
<bean id="eventHandlerTarget" class="com.findonnet.messaging.EventHandler"></bean>
<bean id="eventHandler" class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
<property name="transactionManager"><ref bean="transactionManager" /></property>
<property name="target"><ref bean="eventHandlerTarget"/></property>
<property name="transactionAttributes">
<props>
<prop key="handle*">PROPAGATION_REQUIRED,-Exception</prop>
</props>
</property>
</bean>
Here we define the TransactionProxyFactoryBean
and pass in the transactionManager
reference, which we will see later, with the transaction attributes "PROPAGATION_REQUIRED,-Exception"
. This will be applied to the target bean eventHandlerTarget
and only on methods, which start with "handle" (notice the handle*)
. To put it in simple words, what we are asking Spring framework to do is; for all method invocations on the target object, whose method names start with"handle"
, please apply the transaction attributes "PROPAGATION_REQUIRED,-Exception"
. Behind the scenes, the Spring framework will create a CGLIB based proxy, which intercepts all the calls on the EventHandler
for the method names, that start with "handle". In case of any Exception
, within the method call, the current transaction will be rolled back and that is what the "-Exception" means. This demonstrates how easy it is providing transactional support, declaratively, using Spring.
Now let us look at how we can wire up the Spring's JtaTransactionManager to use our choice of JTA implementation. The eventHandler bean defined above uses the transactionManager attribute, which will refer to the Spring JtaTransactionManager as shown below:
<bean id="jbossTransactionManager"
class="com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple">
</bean>
<bean id="jbossUserTransaction"
class="com.arjuna.ats.internal.jta.transaction.arjunacore.UserTransactionImple"/>
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager">
<ref bean="jbossTransactionManager" />
</property>
<property name="userTransaction">
<ref bean="jbossUserTransaction" />
</property>
</bean>
As shown above in the configuration, we are wiring up the spring provided JtaTransactionManager
class to use com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple
as the TransactionManager implementation and com.arjuna.ats.internal.jta.transaction.arjunacore.UserTransactionImple
as the UserTransaction implementation.
That's all there is to it. We have just enabled XA transactions using Spring and JbossTS with MySQL datasources acting as XA resources. Please note that at the time of writing there are some known issues with MySQL XA implementation (See the resources section).
To run this use case please look at the JbossSender task in the ant build file. The project download is provided in the Resources section.
Use Case2 - Update database and send JMS message in a global transaction
This use case uses the same code we used for use case 1. The code updates the database mydb1 and then sends a message to a message queue in a global transaction as shown in Figure 3 above. Both Atomikosand Bitronix configurations will be dealt with, in that order.
The POJO is same as the one we used for usecase1, the EventHandler
class, and the relevant code looks as follows:
public void handleEvent(boolean fail) throws Exception {
MessageSequenceDAO dao = (MessageSequenceDAO) springContext.getBean("sequenceDAO");
int value = 13;
String app = "spring";
String appKey = "execution";
int upCnt = dao.updateSequence(value, app, appKey);
log.debug(" sql updCnt->" + upCnt);
...
if (springContext.containsBean("appSenderTemplate")) {
this.setJmsTemplate((JmsTemplate) springContext.getBean("appSenderTemplate"));
this.getJmsTemplate().convertAndSend("Testing 123456");
log.debug("Sent message succesfully");
}
if (fail) {
throw new RuntimeException("Simulating Rollback by throwing Exception !!");
}
}
The first code snippet in the above mentioned code updates the msgseq table in the database mydb1 with the sequence number. The next piece of code uses the appSenderTemplate, which is configured to use the JmsTemplate class from Spring. This template is used to send JMS messages to the message provider. We will see how this is defined in the configuration file in the following segment.
An external event, in this case the MainApp
class, will invoke the method handleEvent
shown in Figure 3 above.
The last if
statement is for simulating a run-time exception to test if the transaction manager has successfully rolled back the global transaction.
Let us look at the JNDI bean definitions for our messaging needs in the spring configuration file:
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">
org.apache.activemq.jndi.ActiveMQInitialContextFactory
</prop>
</props>
</property>
</bean>
<bean id="appJmsDestination"
class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate">
<ref bean="jndiTemplate"/>
</property>
<property name="jndiName" value="test.q1"/>
</bean>
<bean id="appSenderTemplate"
class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref bean="queueConnectionFactoryBean"/>
</property>
<property name="defaultDestination">
<ref bean="appJmsDestination"/>
</property>
<property name="messageTimestampEnabled" value="false"/>
<property name="messageIdEnabled" value="false"/>
<!-- sessionTransacted should be true only for Atomikos -->
<property name="sessionTransacted" value="true"/>
</bean>
Here we are specifying a JndiTemplate
for Spring to do a JNDI lookup on the destination specified by the appJmsDestination
bean. The appJmsDestination
bean has been wired with the appSenderTemplate
(JmsTemplate) bean as shown above. The bean definitions also show that appSenderTemplate
is wired to use the queueConnectionFactoryBean
, which we will see later. For Atomikos, the sessionTransacted
property should be set to "true", which is not advised by the Spring framework and the literature on JMS seem to support that viewpoint. This is a hack we need to do only for Atomikos implementation. If this is set to "false", you will notice some Heuristic Exceptions thrown during the 2PC protocol. This is mainly attributed to the prepare()
call not responding on the JMS resource, and eventually, Atomikos decides to rollback resulting in a Heuristic Exception. The messageTimestampEnabled and the messageIdEnabled attributes are set to "false" so that these are not generated since we are not going to use them anyway. This will reduce the overhead on the JMS provider and improves performance.
Let us look at the spring configuration beans for Atomikos:
<bean id="xaFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<constructor-arg>
<value>tcp://localhost:61616</value>
</constructor-arg>
</bean>
<bean id="queueConnectionFactoryBean"
class="com.atomikos.jms.QueueConnectionFactoryBean" init-method="init">
<property name="resourceName">
<value>Execution_Q</value>
</property>
<property name="xaQueueConnectionFactory">
<ref bean="xaFactory" />
</property>
</bean>
<bean id="atomikosTransactionManager"
class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
<property name="forceShutdown"><value>true</value></property>
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"/>
<bean id="transactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager">
<ref bean="atomikosTransactionManager" />
</property>
<property name="userTransaction">
<ref bean="atomikosUserTransaction" />
</property>
</bean>
The xaFactory
definition shows that the underlying xa connection factory being used is the org.apache.activemq.ActiveMQXAConnectionFactory
class. ThetransactionManager bean is wired up to use theatomikosTransactionManager bean and theatomikosUserTransaction bean.
Let us now look at the datasource definition for Atomikos:
<bean id="dataSource" class="com.atomikos.jdbc.SimpleDataSourceBean" init-method="init" destroy-method="close">
<property name="uniqueResourceName"><value>Mysql</value></property>
<property name="xaDataSourceClassName">
<value>com.mysql.jdbc.jdbc2.optional.MysqlXADataSource</value>
</property>
<property name="xaDataSourceProperties">
<value>URL=jdbc:mysql://127.0.0.1:3306/mydb1?user=root&password=murali</value>
</property>
<property name="exclusiveConnectionMode"><value>true</value></property>
</bean>
Atomikos provides a generic wrapper class, which makes it easy to pass in the xaDataSourceClassName, which, in our case, is com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
. Same is the case for the JDBC url. The exclusiveConnectionMode is set to "true" to make sure that the connection in the current transaction is not shared. Atomikos provides connection pooling out of the box, and one can set the pool size using theconnectionPoolSize attribute.
Let us now look at the relevant bean definitions for Bitronix:
<bean id="ConnectionFactory" factory-bean="ConnectionFactoryBean" factory-method="createResource" />
<bean id="dataSourceBean1" class="bitronix.tm.resource.jdbc.DataSourceBean">
<property name="className" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
<property name="uniqueName" value="mysql" />
<property name="poolSize" value="2" />
<property name="driverProperties">
<props>
<prop key="user">root</prop>
<prop key="password">murali</prop>
<prop key="databaseName">mydb1</prop>
</props>
</property>
</bean>
<bean id="Db1DataSource" factory-bean="dataSourceBean1" factory-method="createResource" />
<bean id="BitronixTransactionManager" factory-method="getTransactionManager"
class="bitronix.tm.TransactionManagerServices" depends-on="btmConfig,ConnectionFactory" destroy-method="shutdown" />
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="BitronixTransactionManager" />
<property name="userTransaction" ref="BitronixTransactionManager" />
</bean>
<bean id="btmConfig" factory-method="getConfiguration" class="bitronix.tm.TransactionManagerServices">
<property name="serverId" value="spring-btm-sender" />
</bean>
The appSenderTemplate bean defined for Atomikos can be re-used, with the only exception of the sessionTransacted value. This is set to "false", the default for the JmsTemplate
in Spring anyway, so one can even ignore this attribute.
The datasource bean definition, shown above, looks similar to Atomikos, but the main difference is that the bean creation is done using an instance factory method rather than the static factory method. In this case, theDb1DataSource bean is created using the factory-beandataSourceBean1. The factory method specified wascreateResource.
The transactionManager bean refers to BitonixTransactionManager
for both the transactionManagerattribute and the userTransaction attribute. To run this use case, please look at the AtomikosSender task and the BitronixSender in the ant build file, provided as part of the project download.
The sequence diagram for this use case, which is by no means a comprehensive one, is shown below:
UseCase3 - (Transactional MDP's) Consume message and update database in a global transaction
This use case is different from the previous use cases and all it does is define a POJO to handle the messages received from a messaging provider. It also updates the database within the same transaction as shown in Figure 4 above.
The relevant code for our MessageHandler
class looks as follows:
public void handleOrder(String msg) {
log.debug("Receieved message->: " + msg);
MessageSequenceDAO dao = (MessageSequenceDAO) MainApp.springCtx.getBean("sequenceDAO");
String app = "spring";
String appKey = "allocation";
int upCnt = dao.updateSequence(value++, app, appKey);
log.debug("Update SUCCESS!! Val: " + value + " updateCnt->"+ upCnt);
if (fail)
throw new RuntimeException("Rollback TESTING!!");
}
As you can see, the code just updates the database mydb2. The MessageHandler is just a POJO, which has a handleOrder() method. Using Spring we are going to transform this into a message driven POJO (analogous to MDB in a JEE server). To accomplish this we will use the MessageListenerAdapter
class, which delegates the message handling to the target listening methods via reflection. This is a very convenient feature, which enables simple POJO's to be converted to message driven POJO's (beans). Our MDP now supports distributed transactions.
It's time for us to look at the configuration for more clarity:
<bean id="msgHandler" class="com.findonnet.messaging.MessageHandler"/>
<bean id="msgListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<property name="delegate" ref="msgHandler"/>
<property name="defaultListenerMethod" value="handleOrder"/>
</bean>
The above configuration shows that the msgListener bean delegates the calls to the bean defined by the msgHandler. Also, we have specified the handleOrder(), which should be invoked when the message arrives from the message provider. This was done using the defaultListenerMethodattribute. Let us now look at the message listener, which listens to the destination on the message provider:
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
destroy-method="close">
<property name="concurrentConsumers" value="1"/>
<property name="connectionFactory" ref="queueConnectionFactoryBean" />
<property name="destination" ref="appJmsDestination"/>
<property name="messageListener" ref="msgListener"/>
<property name="transactionManager" ref="transactionManager"/>
<property name="sessionTransacted" value="false"/>
<property name="receiveTimeout" value="5000"/>
<property name="recoveryInterval" value="6000"/>
<property name="autoStartup" value="false"/>
</bean>
The listenerContainer in this case uses the DefaultMessageListenerContainer
provided by Spring. The concurrentConsumers attribute is set to "1", which indicates that we will only have one consumer on the queue. This attribute is mainly used for draining the queues concurrently by spawning multiple listeners (threads), and is very useful in situations where you have a fast producer and slow consumer and the ordering of the messages is not important. With Spring 2.0.3 and above there is support for dynamically adjusting the number of listeners based on load using themaxConcurrentConsumers attribute. The recoveryInterval attribute is used for recovery purposes and is useful when a messaging provider is down and we want to re-connect without bringing the application down. This feature, however, runs in an infinite loop and keeps re-trying for as long as the application is running, which you may not want. Also, one has to be careful in properly disposing the DMLC, since there are background threads, which might still be trying to receive messages from the message provider even after the JVM is shutdown. As of Spring 2.0.4, this issue has been fixed. As mentioned before, the sessionTransacted attribute should be set to "true" for Atomikos only. The same bean definition for listenerContainer applies to both Atomikos and Bitronix.
Please note that the transactionManger attribute points to the bean definitions that were defined above (for usecase2) and we are just re-using the same bean definitions.
That's all there is to it, we just implemented our MDP which receives the message and updates the database in a single global transaction.
To run this use case, please look at the AtomikosConsumer task and theBitronixConsumer in the ant build file, provided as part of the project download.
Some AspectJ
To intercept the calls, between the Spring framework and the JTA code, an Interceptor class has been used and weaved into the runtime libraries of the JTA implementation jar files. It uses AspectJ and the code snippet is shown below:
pointcut xaCalls() : call(* XAResource.*(..))
|| call(* TransactionManager.*(..))
|| call(* UserTransaction.*(..))
;
Object around() : xaCalls() {
log.debug("XA CALL -> This: " + thisJoinPoint.getThis());
log.debug(" -> Target: " + thisJoinPoint.getTarget());
log.debug(" -> Signature: " + thisJoinPoint.getSignature());
Object[] args = thisJoinPoint.getArgs();
StringBuffer str = new StringBuffer(" ");
for(int i=0; i< args.length; i++) {
str.append(" [" + i + "] = " + args[i]);
}
log.debug(str);
Object obj = proceed();
log.debug("XA CALL RETURNS-> " + obj);
return obj;
}
The above code defines a pointcut on all calls made to any JTA related code and it also defines an around advice, which logs the arguments being passed and the method return values. This will come in handy when we are trying to trace and debug issues with JTA implementations. The antbuild.xml file in the project (see Resources) contains tasks to weave the aspect against the JTA implementations. Another option is to use the MaintainJ plugin for eclipse, which provides the same from the comfort of an IDE (Eclipse) and even generates the sequence diagram for the process flow.
Some Gotchas
Distributed transactions is a very complex topic and one should look out for implementations where transaction recovery is robust enough and provides all the ACID (Atomicity, Consistency, Isolation and Durability) criteria that the user or application expects. What we tested, in this article, was for pre-2PC exceptions (remember the RuntimeException we were throwing to test rollbacks? ). Applications should thoroughly test JTA implementations for failures during the 2 phase commit process as they are the most crucial and troublesome.
All the JTA implementations we looked at provide recovery test cases, which make it easy to run against the implementation itself, and on the participating XA resources as well. Please note that using XA may turn out to be a huge performance concern especially when the transaction volumes are large. One should also look at support for 2PC optimizations like the "last resource commit", which might fit some application needs where only one of the participating resource cannot or need not support 2PC. Care should be taken about the XA features supported and restrictions imposed, if any, by the vendors of the database or the message provider. For example, MySQL doesn't support suspend()
and resume()
operations and also seems to have some restrictions on using XA and in some situations might even keep the data in an in-consistent state. To learn more about XA, Principles of Transaction Processing is a very good book, which covers the 2PC failure conditions and optimizations in great detail. Also, Mike Spille's blog (see Resources section) is another good resource, which focuses on XA within the JTA context and provides wealth of information, especially on failures during 2PC and helps understand more about XA transactions.
When using Spring framework for sending and receiving JMS messages, one should be wary of using the JmsTemplate
and the DefualtMessageListenerContainer
when running in a non-J(2)EE environment. In case of JmsTemplate, for every message that is sent there will be a new JMS connection created. Same is the case when using the DefaultMessageListenerContainer
when receiving messages. Creating a heavy weight JMS connection is very resource-intensive and the application may not scale well under heavy loads. One option is to look for some sort of connection/session pooling support either from the JMS providers or third-party libraries. Another option is to use the SingleConnnectionFactory
from Spring, which makes it easy to configure a single connection, which can be re-used. The sessions are, however created for every message being sent, and this may not be a real overhead since JMS sessions are lightweight. Same is the case when messages are being received irrespective of if they are transactional or not.
Conclusion
In this article we saw how Spring framework can be integrated with JTA implementations to provide distributed transactions and how it could cater to the needs of an application which required distributed transactions without the need for a full-blown JEE server. The article also show-cased how Spring framework provided POJO based solutions and declarative transaction management with minimal intrusion while promoting best design practices. The use cases we saw, also demonstrated how Spring provides us with a rich transaction management abstraction, enabling us to easily switch between different JTA providers seamlessly.
Author bio
Murali Kosaraju works as a technical architect at Wachovia Bank in Charlotte, North Carolina. He holds a masters degree in Systems and Information engineering and his interests include messaging, service oriented architectures (SOA), Web Services, JEE and .NET centric applications. He currently lives with his wife Vidya and son Vishal in South Carolina.
Learn more about this topic
Download the source code (8M Zip file).
X/Open XA
JTA API
Spring Framework
ActiveMQ
Mike Spille's Blog
MySQL XA issues
JTA Implementations: