该文章讲的不是仅mysql的多数据源。
环境:
mysql: 5.1.27
mongodb: 4.2.2
springboot: 2.2.2
在前n天遇到一个业务是用到mysql和mongo。都有写操作,想到急需一个事务管理的东东,上网搜了一遍,发现mysql和mongo是各自独立事务管理的。mongo用的是 MongoTransactionManager(要依赖的等会给),mysql用的是 DataSourceTransactionManager 。
刚开始我选择的一种思路是给两个 TransactionManager @Bean方式配置上。发现 @Transactional 注解方式仅支持一种事务管理(我看了一下代码方式的事务好像也是有冲突,具体冲突等会讲)@Transactional(transactionManager = "transactionManager")。这个仅支持一种事务管理,要从一个叫 AbstractPlatformTransactionManager 的类说起。AbstractPlatformTransactionManager 是所有事务管理的一个抽象类,其作用是给spring可调用组件式的事务管理继承类(如MongoTransactionManager、DataSourceTransactionManager)。
查看 AbstractPlatformTransactionManager 源代码,该类的开始事务流程需要自行去查看。
下面我要说一下与整个事务改造的相关类。
AbstractPlatformTransactionManager 有一个顺序特别要注意,就是子事务管理类操作完doBegin后才会进行 TransactionSynchronizationManager 相关的初始化,该类与事务上下文有着紧密的联系,事务该不该做都会先咨询该类。该类由一堆 ThreadLocal 管理事务的独立,可自行查看源码。
查看 TransactionSynchronizationManager 后得知,一个线程紧允许一个事务管理进行开启事务。由于 MongoTransactionManager 与 DataSourceTransactionManager 都会操作 TransactionSynchronizationManager 导致在各自开启事务时均到该类操作 synchronizations (一个线程对应一个事务“链”,该变量存放的是resourceList)
显然两个事务管理都配置的方式不能满足我的需求
上网查资料重新整理思路,发现了一个新东西,jta+atomikos 分布式事务管理,但网上的案例全是mysql多数据源。
当时抱着不管三七二十一都加一下依赖来看看这东西玩法的心态就试了一下,通过不断阅读源码与理解,最后就搭建出来了。
先添加以下依赖:
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'
implementation 'org.springframework.boot:spring-boot-starter-jta-atomikos'
implementation 'org.springframework.boot:spring-jdbc'
implementation 'mysql:mysql-connector-java:8.0.11'
看了一下 jta-atomikos 有一堆*Imp的类,一开始就想着估计是给用户自定义的吧。配置的时候也是创建一个叫UserTransactionImp() 的东西,注入给 Jta 的事务管理 JtaTransactionManager 。看了没啥特殊的依赖,直接复制他的源码,创建自己的 UserTransactionImp。
这里要特别注意的是:使用了 jta-atomikos 会直接管理 mysql 的事务,使用的是 atomikos 的 xa 事务。只需要单独处理 mongo 的事务
package com.shero.sport.web.conf;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.shero.comm.constant.TransactionManagerConstant;
import com.shero.sport.service.utils.MongoUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.transaction.TransactionManagerCustomizers;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.MongoTransactionManager;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.sql.DataSource;
import javax.transaction.UserTransaction;
import java.util.Properties;
@Configuration
public class TransactionConfiguration {
@Autowired Environment env;
// @Bean(name = TransactionManagerConstant.MONGO_TRANSACTION_MANAGER)
public MongoTransactionManager mongoTransactionManager(MongoDbFactory factory) {
return new MongoTransactionManager(factory);
}
// @Bean
public DataSourceTransactionManager transactionManager(DataSource dataSource,
ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(dataSource);
transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(transactionManager));
return transactionManager;
}
@Bean(name = "primaryMysql")
@Primary
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource primaryDataSource() throws Exception {
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setUniqueResourceName("primaryMysql");
ds.setPoolSize(5);
ds.setXaProperties(build("spring.datasource."));
return ds;
}
@Bean(name = TransactionManagerConstant.TRANSACTION_MANAGER)
@Primary
@Autowired
public JtaTransactionManager transactionManager(MongoDbFactory factory, MongoUtils mongoUtils) {
UserTransactionManager userTransactionManager = new UserTransactionManager();
UserTransaction userTransaction = new JtaTransactionImp(mongoTransactionManager(factory), mongoUtils);
return new JtaTransactionManager(userTransaction, userTransactionManager);
}
private Properties build(String prefix) {
Properties prop = new Properties();
prop.put("url", env.getProperty(prefix + "url"));
prop.put("username", env.getProperty(prefix + "username"));
prop.put("password", env.getProperty(prefix + "password"));
prop.put("driverClassName", env.getProperty(prefix + "driver-class-name"));
return prop;
}
}
mysql的数据库配置看上面的build方法对应给到 yml 或者 properties 配置即可
mongo则根据 spring-data-mongodb 配置即可
直接丢出一份自定义的 UserTransactionImp
package com.shero.sport.web.conf;
import com.atomikos.icatch.config.UserTransactionService;
import com.atomikos.icatch.config.UserTransactionServiceImp;
import com.atomikos.icatch.jta.TransactionManagerImp;
import com.atomikos.util.SerializableObjectFactory;
import com.mongodb.MongoException;
import com.mongodb.TransactionOptions;
import com.mongodb.client.ClientSession;
import com.shero.sport.service.utils.MongoUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.MongoDatabaseUtils;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.MongoTransactionManager;
import org.springframework.data.mongodb.SessionSynchronization;
import org.springframework.lang.Nullable;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.support.ResourceHolderSupport;
import org.springframework.transaction.support.SmartTransactionObject;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionSynchronizationUtils;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.transaction.NotSupportedException;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.Objects;
@Slf4j
public class JtaTransactionImp implements UserTransaction, Serializable, Referenceable{
private static final long serialVersionUID = -865418426269785202L;
private transient TransactionManager txmgr_;
private transient MongoTransactionManager mongoTransactionManager;
private final MongoUtils mongoUtils;
public JtaTransactionImp(MongoTransactionManager mongoTransactionManager, MongoUtils mongoUtils) {
this.mongoTransactionManager = mongoTransactionManager;
this.mongoUtils = mongoUtils;
}
/**
* Referenceable mechanism requires later setup of txmgr_, otherwise binding
* into JNDI already requires that TM is running.
*/
private void checkSetup () {
synchronized (TransactionManagerImp.class) {
txmgr_ = TransactionManagerImp.getTransactionManager();
if (Objects.isNull(txmgr_)) {
UserTransactionService uts = new UserTransactionServiceImp();
uts.init();
txmgr_ = TransactionManagerImp.getTransactionManager();
}
}
}
private Object getMongoTransaction() throws TransactionException {
MongoDbFactory mongoDbFactory = mongoTransactionManager.getDbFactory();
MongoResourceHolder resourceHolder = (MongoResourceHolder) TransactionSynchronizationManager
.getResource(mongoDbFactory);
return new MongoTransactionObject(resourceHolder);
}
private static MongoTransactionObject extractMongoTransaction(Object transaction) {
Assert.isInstanceOf(MongoTransactionObject.class, transaction,
() -> String.format("Expected to find a %s but it turned out to be %s.", MongoTransactionObject.class,
transaction.getClass()));
return (MongoTransactionObject) transaction;
}
protected int determineTimeout(TransactionDefinition definition) {
if (definition.getTimeout() != TransactionDefinition.TIMEOUT_DEFAULT) {
return definition.getTimeout();
}
return TransactionDefinition.TIMEOUT_DEFAULT;
}
private MongoResourceHolder newResourceHolder(TransactionDefinition definition) {
MongoDbFactory dbFactory = mongoTransactionManager.getDbFactory();
Class mongoDatabaseUtilsClazz = MongoDatabaseUtils.class;
ClientSession session = null;
try {
Method doGetSession = mongoDatabaseUtilsClazz.getDeclaredMethod(
"doGetSession", MongoDbFactory.class, SessionSynchronization.class);
doGetSession.setAccessible(true);
session = (ClientSession) doGetSession.invoke(
mongoDatabaseUtilsClazz.newInstance(), dbFactory, SessionSynchronization.ALWAYS);
} catch (Exception e) {
log.error("getSession err;", e.getCause());
}
if (Objects.nonNull(session)) {
MongoResourceHolder resourceHolder = new MongoResourceHolder(session, dbFactory);
resourceHolder.setTimeoutIfNotDefaulted(determineTimeout(definition));
return resourceHolder;
}
return null;
}
private static String debugString(@Nullable ClientSession session) {
if (session == null) {
return "null";
}
String debugString = String.format("[%s@%s ", ClassUtils.getShortName(session.getClass()),
Integer.toHexString(session.hashCode()));
try {
if (session.getServerSession() != null) {
debugString += String.format("id = %s, ", session.getServerSession().getIdentifier());
debugString += String.format("causallyConsistent = %s, ", session.isCausallyConsistent());
debugString += String.format("txActive = %s, ", session.hasActiveTransaction());
debugString += String.format("txNumber = %d, ", session.getServerSession().getTransactionNumber());
debugString += String.format("closed = %d, ", session.getServerSession().isClosed());
debugString += String.format("clusterTime = %s", session.getClusterTime());
} else {
debugString += "id = n/a";
debugString += String.format("causallyConsistent = %s, ", session.isCausallyConsistent());
debugString += String.format("txActive = %s, ", session.hasActiveTransaction());
debugString += String.format("clusterTime = %s", session.getClusterTime());
}
} catch (RuntimeException e) {
debugString += String.format("error = %s", e.getMessage());
}
debugString += "]";
return debugString;
}
/**
* @see javax.transaction.UserTransaction
*/
public void begin() throws NotSupportedException, SystemException {
checkSetup();
txmgr_.begin();
mongoUtils.setSessionSynchronizationForTransactionBegin();
}
/**
* @see javax.transaction.UserTransaction
*/
public void commit() throws javax.transaction.RollbackException,
javax.transaction.HeuristicMixedException,
javax.transaction.HeuristicRollbackException,
javax.transaction.SystemException, java.lang.IllegalStateException,
java.lang.SecurityException
{
if (Objects.nonNull(TransactionSynchronizationManager.getResource(mongoTransactionManager.getDbFactory()))) {
MongoTransactionObject mongoTransactionObject = extractMongoTransaction(getMongoTransaction());
MongoResourceHolder resourceHolder = newResourceHolder(TransactionDefinition.withDefaults());
mongoTransactionObject.setResourceHolder(resourceHolder);
try {
mongoTransactionObject.commitTransaction();
TransactionSynchronizationManager.unbindResource(mongoTransactionManager.getDbFactory());
mongoTransactionObject.getRequiredResourceHolder().clear();
mongoTransactionObject.closeSession();
mongoUtils.setSessionSynchronizationForTransactionCompletion();
} catch (Exception ex) {
throw new TransactionSystemException(String.format("Could not commit Mongo transaction for session %s.",
debugString(mongoTransactionObject.getSession())), ex);
}
}
checkSetup();
txmgr_.commit();
}
/**
* @see javax.transaction.UserTransaction
*/
public void rollback() throws IllegalStateException, SystemException, SecurityException {
if (Objects.nonNull(TransactionSynchronizationManager.getResource(mongoTransactionManager.getDbFactory()))) {
MongoTransactionObject mongoTransactionObject = extractMongoTransaction(getMongoTransaction());
MongoResourceHolder resourceHolder = newResourceHolder(TransactionDefinition.withDefaults());
mongoTransactionObject.setResourceHolder(resourceHolder);
try {
mongoTransactionObject.abortTransaction();
TransactionSynchronizationManager.unbindResource(mongoTransactionManager.getDbFactory());
mongoTransactionObject.getRequiredResourceHolder().clear();
mongoTransactionObject.closeSession();
mongoUtils.setSessionSynchronizationForTransactionCompletion();
} catch (MongoException ex) {
throw new TransactionSystemException(String.format("Could not abort Mongo transaction for session %s.",
debugString(mongoTransactionObject.getSession())), ex);
}
}
checkSetup();
txmgr_.rollback();
}
/**
* @see javax.transaction.UserTransaction
*/
public void setRollbackOnly() throws IllegalStateException, SystemException {
checkSetup();
txmgr_.setRollbackOnly();
}
/**
* @see javax.transaction.UserTransaction
*/
public int getStatus() throws SystemException {
checkSetup();
return txmgr_.getStatus();
}
/**
* @see javax.transaction.UserTransaction
*/
public void setTransactionTimeout(int seconds) throws SystemException {
checkSetup();
txmgr_.setTransactionTimeout( seconds );
}
//
//
// IMPLEMENTATION OF REFERENCEABLE
//
//
public Reference getReference() throws NamingException {
return SerializableObjectFactory.createReference(this);
}
/**
* @see org.springframework.data.mongodb.MongoResourceHolder
*/
protected static class MongoTransactionObject implements SmartTransactionObject {
private @Nullable
MongoResourceHolder resourceHolder;
MongoTransactionObject(@Nullable MongoResourceHolder resourceHolder) {
this.resourceHolder = resourceHolder;
}
/**
* Set the {@link MongoResourceHolder}.
*
* @param resourceHolder can be {@literal null}.
*/
void setResourceHolder(@Nullable MongoResourceHolder resourceHolder) {
this.resourceHolder = resourceHolder;
}
/**
* @return {@literal true} if a {@link MongoResourceHolder} is set.
*/
final boolean hasResourceHolder() {
return resourceHolder != null;
}
/**
* Start a MongoDB transaction optionally given {@link TransactionOptions}.
*
* @param options can be {@literal null}
*/
void startTransaction(@Nullable TransactionOptions options) {
ClientSession session = getRequiredSession();
if (options != null) {
session.startTransaction(options);
} else {
session.startTransaction();
}
}
/**
* Commit the transaction.
*/
public void commitTransaction() {
getRequiredSession().commitTransaction();
}
/**
* Rollback (abort) the transaction.
*/
public void abortTransaction() {
getRequiredSession().abortTransaction();
}
/**
* Close a {@link ClientSession} without regard to its transactional state.
*/
void closeSession() {
ClientSession session = getRequiredSession();
if (session.getServerSession() != null && !session.getServerSession().isClosed()) {
session.close();
}
}
@Nullable
public ClientSession getSession() {
return resourceHolder != null ? resourceHolder.getSession() : null;
}
private MongoResourceHolder getRequiredResourceHolder() {
Assert.state(resourceHolder != null, "MongoResourceHolder is required but not present. o_O");
return resourceHolder;
}
private ClientSession getRequiredSession() {
ClientSession session = getSession();
Assert.state(session != null, "A Session is required but it turned out to be null.");
return session;
}
/*
* (non-Javadoc)
* @see org.springframework.transaction.support.SmartTransactionObject#isRollbackOnly()
*/
@Override
public boolean isRollbackOnly() {
return this.resourceHolder != null && this.resourceHolder.isRollbackOnly();
}
/*
* (non-Javadoc)
* @see org.springframework.transaction.support.SmartTransactionObject#flush()
*/
@Override
public void flush() {
TransactionSynchronizationUtils.triggerFlush();
}
}
/**
* @see org.springframework.data.mongodb.MongoResourceHolder
*/
class MongoResourceHolder extends ResourceHolderSupport {
private @Nullable ClientSession session;
private MongoDbFactory dbFactory;
/**
* Create a new {@link com.shero.sport.web.conf.JtaTransactionImp.MongoResourceHolder} for a given {@link ClientSession session}.
*
* @param session the associated {@link ClientSession}. Can be {@literal null}.
* @param dbFactory the associated {@link MongoDbFactory}. must not be {@literal null}.
*/
MongoResourceHolder(@Nullable ClientSession session, MongoDbFactory dbFactory) {
this.session = session;
this.dbFactory = dbFactory;
}
/**
* @return the associated {@link ClientSession}. Can be {@literal null}.
*/
@Nullable
ClientSession getSession() {
return session;
}
/**
* @return the required associated {@link ClientSession}.
* @throws IllegalStateException if no {@link ClientSession} is associated with this {@link com.shero.sport.web.conf.JtaTransactionImp.MongoResourceHolder}.
* @since 2.1.3
*/
ClientSession getRequiredSession() {
ClientSession session = getSession();
if (session == null) {
throw new IllegalStateException("No session available!");
}
return session;
}
/**
* @return the associated {@link MongoDbFactory}.
*/
public MongoDbFactory getDbFactory() {
return dbFactory;
}
/**
* Set the {@link ClientSession} to guard.
*
* @param session can be {@literal null}.
*/
public void setSession(@Nullable ClientSession session) {
this.session = session;
}
/**
* Only set the timeout if it does not match the {@link TransactionDefinition#TIMEOUT_DEFAULT default timeout}.
*
* @param seconds
*/
void setTimeoutIfNotDefaulted(int seconds) {
if (seconds != TransactionDefinition.TIMEOUT_DEFAULT) {
setTimeoutInSeconds(seconds);
}
}
/**
* @return {@literal true} if session is not {@literal null}.
*/
boolean hasSession() {
return session != null;
}
/**
* @return {@literal true} if the session is active and has not been closed.
*/
boolean hasActiveSession() {
if (!hasSession()) {
return false;
}
return hasServerSession() && !getRequiredSession().getServerSession().isClosed();
}
/**
* @return {@literal true} if the session has an active transaction.
* @since 2.1.3
* @see #hasActiveSession()
*/
boolean hasActiveTransaction() {
if (!hasActiveSession()) {
return false;
}
return getRequiredSession().hasActiveTransaction();
}
/**
* @return {@literal true} if the {@link ClientSession} has a {@link com.mongodb.session.ServerSession} associated
* that is accessible via {@link ClientSession#getServerSession()}.
*/
boolean hasServerSession() {
try {
return getRequiredSession().getServerSession() != null;
} catch (IllegalStateException serverSessionClosed) {
// ignore
}
return false;
}
}
}
再贴一下MongoUtils
@Component
public class MongoUtils {
public void setSessionSynchronizationForTransactionBegin() {
mongoTemplate.setSessionSynchronization(SessionSynchronization.ALWAYS);
}
public void setSessionSynchronizationForTransactionCompletion() {
mongoTemplate.setSessionSynchronization(SessionSynchronization.ON_ACTUAL_TRANSACTION);
}
@Autowired MongoTemplate mongoTemplate;
}
先讲一下上面 JtaTransactionImp 这个类,从 begin 开始,然后讲一下 commit (rollback一样道理)
单独贴一下 begin 方法:
public void begin() throws NotSupportedException, SystemException {
checkSetup();
txmgr_.begin();
mongoUtils.setSessionSynchronizationForTransactionBegin();
}
我们在整个事务上下文中需要关注 mongoClientSession ,因为主要由 session 控制事务。
开始事务时要关注一下 JtaTransactionImp#newResourceHolder ,该方法要参考 MongoDatabaseUtils#doGetSession 方法,根据 AbstractPlatformTransactionManager 会先开启事务,再初始化 sychronizations ,不能直接在此初始化mongo的session,因为看 doGetSession 会注册一下 sychronizations ,但是此时的 sychronizations 还没被初始化出来。所以我们在开启事务并初始化 sychronizations 后,再进行 newResourceHolder 。控制 doGetSession 在初始化 sychronizations 后才调用,需要关注 mongoTemplate.sessionSynchronization 。我利用 mongoTemplate 被调用时,获取 database 会先调用 MongoDatabaseUtils#doGetSession 方法,需要控制 sessionSynchronization ,该变量决定是否注册一个 mongo 事务,并且我们拿到 session 后就可以对 session 进行事务设置。
在前面开启事务后,重点是commit和rollback
先贴一下
public void commit() throws javax.transaction.RollbackException,
javax.transaction.HeuristicMixedException,
javax.transaction.HeuristicRollbackException,
javax.transaction.SystemException, java.lang.IllegalStateException,
java.lang.SecurityException
{
if (Objects.nonNull(TransactionSynchronizationManager.getResource(mongoTransactionManager.getDbFactory()))) {
MongoTransactionObject mongoTransactionObject = extractMongoTransaction(getMongoTransaction());
MongoResourceHolder resourceHolder = newResourceHolder(TransactionDefinition.withDefaults());
mongoTransactionObject.setResourceHolder(resourceHolder);
try {
mongoTransactionObject.commitTransaction();
TransactionSynchronizationManager.unbindResource(mongoTransactionManager.getDbFactory());
mongoTransactionObject.getRequiredResourceHolder().clear();
mongoTransactionObject.closeSession();
mongoUtils.setSessionSynchronizationForTransactionCompletion();
} catch (Exception ex) {
throw new TransactionSystemException(String.format("Could not commit Mongo transaction for session %s.",
debugString(mongoTransactionObject.getSession())), ex);
}
}
checkSetup();
txmgr_.commit();
}
public void rollback() throws IllegalStateException, SystemException, SecurityException {
if (Objects.nonNull(TransactionSynchronizationManager.getResource(mongoTransactionManager.getDbFactory()))) {
MongoTransactionObject mongoTransactionObject = extractMongoTransaction(getMongoTransaction());
MongoResourceHolder resourceHolder = newResourceHolder(TransactionDefinition.withDefaults());
mongoTransactionObject.setResourceHolder(resourceHolder);
try {
mongoTransactionObject.abortTransaction();
TransactionSynchronizationManager.unbindResource(mongoTransactionManager.getDbFactory());
mongoTransactionObject.getRequiredResourceHolder().clear();
mongoTransactionObject.closeSession();
mongoUtils.setSessionSynchronizationForTransactionCompletion();
} catch (MongoException ex) {
throw new TransactionSystemException(String.format("Could not abort Mongo transaction for session %s.",
debugString(mongoTransactionObject.getSession())), ex);
}
}
checkSetup();
txmgr_.rollback();
}
提交或回滚方法需要mongoSession的事务处理先行,再处理jta的
至此,自定义的 mysql + mongo多种数据源的 spring 分布式事务已完成。可以自行尝试。
路上总是坎坷的,但是扛过来了回头看,路还是那么平坦。