前言
在前一篇中,我们提到了TransactionInterceptor的invoke方法会执行invokeWithinTransaction方法。其中提到了几个关键方法但是没有具体分析,本文将主要通过事务实现的过程来对invokeWithinTransaction中的逻辑进行解剖。另外事务是TransactionInterceptor借助事务处理器来实现的,spring在AbstractPlatformTransactionManager中定义了事务实现的模板,从总体上决定了何时创建事务、如何处理事务的传播行为、何时提交何时回滚,而具体是如何实现创建、回滚等操作是通过具体的事务处理器(如DataSourceTransactionManager)重写某些模板中的方法来实现的。
事务创建
事务创建体现在createTransactionIfNecessary
这个方法。这里提到的事务创建可不仅仅是说一个新的事务是如何创建的,而是一个比较宽泛的概念,它涉及在配置了传播行为时前后事务间是如何组织的,要不要创建一个事务,事务的配置信息和状态,当然底层也会借助事务处理器来实现具体的操作。
protected TransactionInfo createTransactionIfNecessary(
PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
在这个方法中主要做了三个事:一是将事务属性进行封装,当getName时,得到的是连接点的表示,基本就是全限定性类名.xx方法形式;二是借助具体的事务处理器(或者说管理器)来创建事务(这个是重点),并得到事务的状态;三是封装一个TransactionInfo 实例返回,它主要包含了事务的配置信息、事务状态以及事务与线程绑定(线程的绑定是与事务传播行为有关的)。
重点看一下getTransaction方法。他是 AbstractPlatformTransactionManager 的方法,给出了创建事务的实现模板,具体的方法如 doGetTransaction 和 doBegin需要交由具体的平台事务管理器来实现(例如DataSourceTransactionManager)。
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
// 事务对象的创建,有具体的事务处理器实现
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
// 没有传入事务属性的时候,采用默认的事务配置
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
if (isExistingTransaction(transaction)) {
// 当检测到当前线程已有事务,则根据传播行为决定如何进行事务处理
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// 下面的代码根据不同传播行为进行不同的事务处理方式
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
// MANDATORY传播行为表明必须在已有事务中执行,没有事务的话就抛异常
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
// 一般是true
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建status记录事务状态
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 由具体的事务管理器来决定如何启动事务
doBegin(transaction, definition);
// 将一些配置添加到TransactionSynchronizationManager的一系列ThreadLocal中与线程绑定
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
// 其他的传播行为例如NEVER,创建空的事务。Create "empty" transaction: no actual transaction, but potentially synchronization.
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
事务处理器创建事务对象
最开始doGetTransaction会由事务处理器创建事务对象。看DataSourceTransactionManager中的源码:
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
它返回的事务对象是DataSourceTransactionObject,它及它的父类JdbcTransactionObjectSupport主要封装了ConnectionHolder(对Connection的封装)及其他配置。在线程第一次调用getResources时,当前线程还未绑定什么resources,也就没法通过dataSource获取到东西,所以最初conHolder为null。
随后有两种情况,一种是当前线程已经有事务,那么handleExistingTransaction根据传播行为进行事务处理(例如需不需要创建新的事务还是在已有事务中进行)。另一种便是当前线程不存在事务。
当不存在已有事务时
对于PROPAGATION_REQUIRED、REQUIRES_NEW、PROPAGATION_NESTED这几种传播行为,都需要建立新的事务。对于PROPAGATION_NOT_SUPPORTED,不需要事务中执行,则创建一个空事务。对于PROPAGATION_MANDATORY,由于必须要存在已有事务,所以直接抛异常。
方法返回一个DefaultTransactionStatus,它用来记录事务的状态(通过不同的字段),主要包括事务属性配置、事务对象、是否是新的事务等。PROPAGATION_REQUIRED、REQUIRES_NEW、PROPAGATION_NESTED由于在无事务时需要在新的事务下运行,所以doBegin方法就完成了启动新的事务。随后的prepareSynchronization就是完成向TransactionSynchronizationManager的ThreadLocal的注册,可以认为是事务与线程的绑定。TransactionSynchronizationManager维护了一系列的ThreadLocal用户保持事务属性(可以理解为线程是事务的执行者,然后TransactionSynchronizationManager通过ThreadLocal把事务属性告诉给了线程),如事务的隔离级别以及事务是否active。
看一下DataSourceTransactionManager的doBegin是如何启动事务的。
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (txObject.getConnectionHolder() == null ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 对con设置是否只读以及事务隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
// 保存前一个事务的状态
txObject.setPreviousIsolationLevel(previousIsolationLevel);
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
//最重要的一行代码,开启了事务。
con.setAutoCommit(false);
}
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 绑定线程的Resources
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
DataSourceUtils.releaseConnection(con, this.dataSource);
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
一开始当前线程是没有连接的,所以通过数据源拿到一个连接。随后向connection配置了是否只读以及事务隔离级别,并对事务对象配置进行修改。最重要的还是con.setAutoCommit(false)打开了事务。以及将connection信息封装后与datasource一起关联到线程。
最终事务创建完成并返回status实例后,TransactionInfo会将status封装进来,并且他也通过一个threadLocal将自己与事务线程关联起来。随后事务线程就可以通过这个ThreadLocal获取TransactionInfo,进而也就可以获取TransactionStatus。
当存在已有事务时
处理已有事务由handleExistingTransaction方法完成。这个方法比较长但是所完成的就是根据不同的传播行为进行不同处理。所以就只在需要的时候贴部分代码。
·对于配置为NEVER的方法来说,不允许在事务中执行,所以直接抛异常。
·对于PROPAGATION_NOT_SUPPORTED,需要挂起(关于挂起,下一小节会讲)当前事务并在无事务状态下进行。将挂起信息放入status中。
· 对于PROPAGATION_REQUIRES_NEW,将当前事务挂起并开启新事务。
· 对于PROPAGATION_NESTED,如果已经存在事务则在嵌套事务中执行(并不是新的事务),它的实现是通过设置保存点savepoint。
· 对于PROPAGATION_SUPPORTS(有事务就在事务中执行没有则不建立事务运行)和PROPAGATION_REQUIRED(有事务则加入到当前事务中执行,没有的话建立新事务执行),在已存在事务情况下,两种情况都需要在已有事务下运行,所以进行判断,当后来者的事务隔离级别是ISOLATION_DEFAULT时,可以在已有事务中运行,如果不是ISOLATION_DEFAULT并且前后两者的事务隔离级别不同,则抛出异常。从最后的返回status看出,这两类传播行为下并没有创建新的事务(第三个参数)。
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
事务挂起
对于需要挂起当前已有事务的传播行为例如PROPAGATION_NOT_SUPPORTED和PROPAGATION_REQUIRES_NEW来说,挂起操作是通过suspend(transaction)实现,并且在事务Status中将挂起信息存入。
谈一下我对事务挂起的理解,可能并不是很准确。后来者的事务属性配置为PROPAGATION_NOT_SUPPORTED和PROPAGATION_REQUIRES_NEW需要将当前事务挂起。而由于两者都是在同一个线程下,而status中的属性是要与线程绑定的,那么后来者的status更新后,还必须将已有的事务的状态配置信息保存来。所以我们也就可以看出挂起所做的操作:一是将已有事务的状态信息保存起来,二是将其在线程上的绑定信息清除,留给后来者使用,三是后来者的status信息要记录下来已有事务的信息,以便于后续此线程可以获取到先前事务信息。
查看suspend方法,发现大致也就是按照上面说的,具体解释看注释
protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
// 主要是将当前线程关联的connectionHolder,从线程中移除,并返回。
suspendedResources = doSuspend(transaction);
}
// 将已有事务的配置信息取出,并在TransactionSynchronizationManager取消跟当前线程的关联,方便线程留给后面可能的事务来用。
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
// 挂起后,将所挂起的事务的信息封装成SuspendedResourcesHolder后返回。
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
catch (Error err) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw err;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}
最终在status中会将挂起信息加入。
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
事务提交
在TransactionInterpretor的invoke方法中,通过invokeWithinTransaction方法来实现事务,在createTransactionIfNecessary进行了诸如事务的创建、事务与线程的绑定、事务间传播行为的解决、事务的挂起等操作,虽然从名字看,这个方法理解为必要时创建事务,但是我认为理解成它完成了在执行具体目标方法前进行了事务的准备操作比较好。随后invokeWithinTransaction中retVal = invocation.proceedWithInvocation()回调方法执行完成的就是将aop执行链继续执行最终执行到目标方法。如果执行过程中没有出现任何异常,会将事务提交。commitTransactionAfterReturning(txInfo)就是完成了事务提交,并且这个方法依然是调用了事务管理器的commit方法完成提交过程。
protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
if (txInfo != null && txInfo.hasTransaction()) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
我们进入到DataSourceTransactionManager的commit方法,发现先进行了一些判断要不要回滚最后processCommit用于执行提交。
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
// 设有保存点(例如NESTED的事务)正确执行后,将保存点清除
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
// 当前事务是一个新的事务的时候,执行提交。
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
doCommit(status);
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (globalRollbackOnly) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
/*catch到异常的时候如何处理,省略*/
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
triggerXXXXX的方法是执行在TransactionSynchronizationManager中当前线程关联的TransactionSynchronization集合中每一个TransactionSynchronization的XXXXX方法。重点代码:doCommit方法。当前事务为新的事务时,会对事务进行提交,否则不提交。例如当第二个事务配置成了REQUIRED,所以当第二个事务(严格来说这里说是事务不太合适,应该说第二个配置了事务的方法)运行时他是不会提交的,因为它处于第一个事务中。其实doCommit方法就是....
@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
// 这个代码就不用分析了吧,跟jdbc中的一样
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
在doCommit提交事务后,最后在finally中执行cleanupAfterCompletion对事务释放及修改事务状态。见注释。
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
// 设置事务状态为已完成
status.setCompleted();
if (status.isNewSynchronization()) {
// NewSynchronization的事务执行完毕后,将此线程的状态信息相关的threadLocal清除
TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
if (status.isDebug()) {
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
// 如果有挂起的事务,则根据挂起事务信息复原原来的事务
resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
@Override
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// Remove the connection holder from the thread, if exposed.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.unbindResource(this.dataSource);
}
// Reset connection.
Connection con = txObject.getConnectionHolder().getConnection();
try {
if (txObject.isMustRestoreAutoCommit()) {
con.setAutoCommit(true);
}
DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
}
catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}
if (txObject.isNewConnectionHolder()) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
}
DataSourceUtils.releaseConnection(con, this.dataSource);
}
txObject.getConnectionHolder().clear();
}
事务回滚
当事务执行过程中出错时,捕获到抛出的异常后会执行completeTransactionAfterThrowing(txInfo, ex)
方法。
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.hasTransaction()) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
if (txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
/*省略*/
}
else {
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
/*省略*/
}
}
}
这个方法的主要逻辑时,判断当前抛出的异常是否可以回滚,spring对于运行时异常以及错误error是默认支持回滚的,而对于受查异常,当用户自定义了回滚规则后,当前事务的事务属性TransactionAttribute中便存在此条规则,那么rollbackOn(ex)
方法最终返回true,即设置了回滚的受查异常也可以支持回滚。回滚操作rollback由事务处理器给出模板实现,其中调用了processRollback(defStatus)
方法。
private void processRollback(DefaultTransactionStatus status) {
try {
try {
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
}
else if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
}
catch (RuntimeException ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
catch (Error err) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw err;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
finally {
cleanupAfterCompletion(status);
}
}
逻辑比较简单清晰:如果事务存在着保存点,则回滚到保存点;如果当前事务是一个新的事务,则执行doRollback完整的回滚当前事务(doRollback内部很简单,这里不分析了);如果当前事务存在外部事务,且内部事务执行出错时由外部事务负责回滚,则doSetRollbackOnly将内部事务设置为rollback-only。
稍微提一下这个rollback-only,上文说completeTransactionAfterThrowing(下面又贴出了这部分代码)当判断出抛出的异常支持回滚的时候,执行rollback以回滚,而不支持时执行commit,但是并不代表事务会提交,点开DataSourceTransactionManager的commit方法会发现,它会先判断当前事务是不是被标记为了rollback-only,如果是的话则依然执行回滚操作。这种情况可以是这样的一个例子,例如方法B的事务配置为REQUIRED,方法A调用方法B,方法A已存在事务中,那么方法B不会新建事务,而是依附在事务A中执行,当B抛出异常时,按照上文所述,将会调用doSetRollbackOnly方法将当前事务设置为rollback-only,而B本身不会进行事务的回滚,B的回滚需要交由A的事务来处理,如果A的事务发现B抛出的异常可以回滚,那么就回滚事务,不能回滚的话,会发现事务已经被设置为rollback-only的,那么也会执行回滚。
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.hasTransaction()) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
if (txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
/*省略*/
}
else {
// 当抛出的异常不支持回滚时,执行commit方法,但是其中并不是一定会提交,而是也存在回滚,例如事务为rollback-only时
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
/*省略*/
}
}
}