事务前置aop
开启事务,需要加上注解@EnableTransactionManagement,他其实就是往bean容器里面加入AutoProxyRegistrar和ProxyTransactionManagementConfiguration类,
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
@Override
protected String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
(1)AutoProxyRegistrar这个主要是开启aop的
(2)ProxyTransactionManagementConfiguration这个主要是往aop里面注入一个advisor,
return new String[] {AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[] {TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
------ProxyTransactionManagementConfiguration注入的advisor
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
//相当于pointCut
advisor.setTransactionAttributeSource(transactionAttributeSource());
advisor.setAdvice(transactionInterceptor());
if (this.enableTx != null) {
advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
}
return advisor;
}
----------------不知道干啥的
//解析@Transactional上面注解的数据
public TransactionAttribute parseTransactionAnnotation(AnnotatedElement ae) {
AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(ae, Transactional.class, false, false);
return attributes != null ? this.parseTransactionAnnotation(attributes) : null;
}
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
Propagation propagation = (Propagation)attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
Isolation isolation = (Isolation)attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
rbta.setTimeout(attributes.getNumber("timeout").intValue());
rbta.setReadOnly(attributes.getBoolean("readOnly"));
rbta.setQualifier(attributes.getString("value"));
ArrayList<RollbackRuleAttribute> rollBackRules = new ArrayList();
Class<?>[] rbf = attributes.getClassArray("rollbackFor");...
}
事务传播机制为new时的事务大概流程
当方法被调用的时候会调用TransactionInterceptor的invoke方法,事务主要实现逻辑,
在方法执行前加一个事务,把他的autoCommit设置为false,当中途失败的时候会回滚这个事务
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
Method var10001 = invocation.getMethod();
invocation.getClass();
return this.invokeWithinTransaction(var10001, targetClass, invocation::proceed);
}
----------------
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 获取@transaction注解里面配置的属性
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取事务管理器,可以在注解上指定transactionManager,如果在属性上没指定的话就从bean工厂里面取
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
...
// 新版本的话这边有检查数据源是否为PlatformTransactionManager,如果不是的话这边会报错
//PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
// 生成一个事务名称
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建事务(这边重点,包含数据库连接等,多事务的处理)
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// 正常执行其他的拦截器,以及业务方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 当执行过程中出现报错的时候执行,主要是回滚事务(默认这边只回滚runtimeException)
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {...}
-------
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {...
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
//获取一个数据库连接
status = tm.getTransaction(txAttr);
}
else {...
}
}
//将数据库连接绑定到threadLocal里面
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
-------------
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
//调用事务管理器获取事务
Object transaction = doGetTransaction();
//如果已经存在事务,则根据传播机制来处理--->事务传播的话就在这里处理,下面单独写
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}...
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//调用事务管理器开启事务
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
//如果不是上面几种传播机制的话则走这边
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
---------------
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject)transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.obtainDataSource().getConnection();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
// 将自动提交设置为false
con.setAutoCommit(false);
}
this.prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = this.determineTimeout(definition);
if (timeout != -1) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable var7) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.obtainDataSource());
txObject.setConnectionHolder((ConnectionHolder)null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
}
}
当获取到事务后把事务设置到ThreadLocal<TransactionInfo> 里面
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// The transaction manager will flag an error if an incompatible tx already exists.
txInfo.newTransactionStatus(status);
}
else {
//在这边将事务保存到本地线程里
txInfo.bindToThread();
return txInfo;
}
当是多个事务里面的话,也就是涉及到事务传递(注,如果事务是默认的话,或者配置为required的话,几乎不做啥处理,直接用原连接池调用)
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
//调用事务管理器获取事务
Object transaction = doGetTransaction();
//如果已经存在事务,则根据传播机制来处理--->事务传播的话就在这里处理,下面单独写
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}...
----------------
@Override
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
//其实这边就是判断说本地线程里面的有没有数据库连接,并且得是active的
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
----------------
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
//如果事务的传播机制是PROPAGATION_NEVER,则报错,他这个机制不能有事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
//这边将旧线程的数据保存起来,挂起,重新创建一个连接出来
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
当抛出异常时
首先判断是否为runtimeException,只有这个类型的才会回滚
public boolean rollbackOn(Throwable ex) {
return (ex instanceof RuntimeException || ex instanceof Error);
}
最主要的是这条,当一个事务抛出异常时,会把线程里面的数据库连接isRollbackOnly设置为true,
在提交的时候会判断这个数据类型是否回滚
@Override
protected void doSetRollbackOnly(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
if (status.isDebug()) {
logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() +
"] rollback-only");
}
txObject.setRollbackOnly();
}
当提交事务时
当发生异常时会判断isRollbackOnly是否为true,即defStatus.isGlobalRollbackOnly();
shouldCommitOnGlobalRollbackOnly这个方法是判断如果发生异常,全局是否都要一起回滚,默认是true
比如一条事务有两条插入语句,插入A,插入B,
当A执行完成,B报错时,如果shouldCommitOnGlobalRollbackOnly为true,A也会回滚,否则A不会回滚
@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
processCommit(defStatus);
}