自从分布式服务盛行江湖之后,分布式事务一直都是个热门话题,网上也有很多其解决方案,如 TCC、基于可靠消息的最终一致性等等。但是大多都是偏于理论,缺乏实战案例。最重要的是,这些方案都要求在应用的业务层面把分布式事务技术约束考虑到设计中,通常每一个服务都需要设计实现正向和反向的幂等接口。这样的设计约束,往往会导致很高的研发和维护成本。
于是乎,一个高效并且对业务 0 侵入的方案,呼之而来——seata (原名 Fescar) 阿里巴巴开源的分布式事务中间件。对于 seata 的介绍,请参考官方文档,上面已经介绍的很好了,而本文章要讨论的就是 seata 的使用及其实现机制。强烈建议先看 seata 官方文档,再看本篇文章,因为好多原理,文章中不会细聊。
seata-server
与 XA 的模型类似,seata 有 3 个组件来协调分布式事务的处理过程。
- Tansaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
- Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
- Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。
所以,必须先部署 TC,来协调整个事务。我们可以直接下载 seata-server 源码:https://github.com/seata/seata,然后打开server
模块,修改其registry.conf
文件,将 registry 下的 type 改为zk
,默认为file
,并且修改 zk 的 serverAddr
为你自己的 zk 地址。
type 为注册中心类型,它支持 file、nacos(也是阿里开源的服务注册中心及配置中心)、zk、eureka 等,由于我们 dubbo 是用 zk 作为注册中心的,所以这里为zk
。
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "zk"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:1001/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "zk.tbj.com:2181"
session.timeout = 6000
connect.timeout = 2000
}
然后启动 Server 类:
2019-06-12 20:31:19.255 INFO [main]org.I0Itec.zkclient.ZkClient.waitForKeeperState:936 -Waiting for keeper state SyncConnected
2019-06-12 20:31:19.266 INFO [main-SendThread(zk.tbj.com:2181)]org.apache.zookeeper.ClientCnxn.logStartConnect:1025 -Opening socket connection to server zk.tbj.com/192.168.85.2:2181. Will not attempt to authenticate using SASL (unknown error)
2019-06-12 20:31:19.281 INFO [main-SendThread(zk.tbj.com:2181)]org.apache.zookeeper.ClientCnxn.primeConnection:879 -Socket connection established to zk.tbj.com/192.168.85.2:2181, initiating session
2019-06-12 20:31:19.296 INFO [main-SendThread(zk.tbj.com:2181)]org.apache.zookeeper.ClientCnxn.onConnected:1299 -Session establishment complete on server zk.tbj.com/192.168.85.2:2181, sessionid = 0x16ad4597ef21702, negotiated timeout = 6000
2019-06-12 20:31:19.299 INFO [main-EventThread]org.I0Itec.zkclient.ZkClient.processStateChanged:713 -zookeeper state changed (SyncConnected)
如果看到以上日志,说明 seata-server 即 TC 启动完毕。其实 seata-server 就是一个 Netty 服务器,TM、RM 与 seata-server(TC) 之间使用 Netty 进行通信的。
seata 与 springboot 集成
在讲实现机制之前,我们先通过一个 demo 来了解下 seata 是怎么用的。我们还是紧跟时代步伐,与当下最流行的 Spring Boot 框架集成,Rpc 框架我们采用 dubbo。其实,seata 官网提供了多种集成方式,如 Spring,Spring Cloud 等等。具体看 seata-samples。
我们直接看springboot
模块,同样,我需要改其registry.conf
文件,将 registry 下的 type 改为zk
,默认为file
,并且修改 zk 的 serverAddr
为你自己的 zk 地址。 另外,需要修改application.yml
文件中的数据库连接参数,和 dubbo 的参数,改成你自己的参数,如:
server:
port: 9999
servlet:
context-path: /demo
spring:
application:
name: seata-springboot-app
http:
encoding:
charset: UTF-8
enabled: true
force: true
datasource:
driverClassName: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata?useSSL=false&serverTimezone=UTC
username: root
password: 123456
poolPingConnectionsNotUsedFor: 60000
removeAbandoned: true
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
minIdle: 1
validationQuery: SELECT 1 FROM DUAL
initialSize: 5
maxWait: 60000
poolPreparedStatements: false
filters: stat,wall
testOnBorrow: false
testWhileIdle: true
minEvictableIdleTimeMillis: 300000
timeBetweenEvictionRunsMillis: 60000
testOnReturn: false
maxActive: 50
druid:
user: admin
password: admin
jpa:
hibernate:
ddl-auto: none
show-sql: true
dubbo:
server: true
registry: zookeeper://zk.tbj.com:2181
provider:
port: 20999
我们看其数据库配置类DruidConfiguration
:
/**
* The type Druid configuration.
*/
@Configuration
public class DruidConfiguration {
@Value("${spring.datasource.druid.user}")
private String druidUser;
@Value("${spring.datasource.druid.password}")
private String druidPassword;
/**
* Druid data source druid data source.
*
* @return the druid data source
*/
@Bean(destroyMethod = "close", initMethod = "init")
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
return druidDataSource;
}
/**
* Data source data source.
*
* @param druidDataSource the druid data source
* @return the data source
*/
@Primary
@Bean("dataSource")
public DataSource dataSource(DruidDataSource druidDataSource) {
DataSourceProxy dataSourceProxy = new DataSourceProxy(druidDataSource);
return dataSourceProxy;
}
/**
* 注册一个StatViewServlet
*
* @return servlet registration bean
*/
@Bean
public ServletRegistrationBean<StatViewServlet> druidStatViewServlet() {
ServletRegistrationBean<StatViewServlet> servletRegistrationBean = new ServletRegistrationBean<StatViewServlet>(
new StatViewServlet(), "/druid/*");
servletRegistrationBean.addInitParameter("loginUsername", druidUser);
servletRegistrationBean.addInitParameter("loginPassword", druidPassword);
servletRegistrationBean.addInitParameter("resetEnable", "false");
return servletRegistrationBean;
}
/**
* 注册一个:filterRegistrationBean
*
* @return filter registration bean
*/
@Bean
public FilterRegistrationBean<WebStatFilter> druidStatFilter() {
FilterRegistrationBean<WebStatFilter> filterRegistrationBean = new FilterRegistrationBean<WebStatFilter>(
new WebStatFilter());
// 添加过滤规则.
filterRegistrationBean.addUrlPatterns("/*");
// 添加不需要忽略的格式信息.
filterRegistrationBean.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");
return filterRegistrationBean;
}
}
主要的是上面两个配置,druidDataSource()
和dataSource(DruidDataSource druidDataSource
,后面两个是 Druid 管理平台监控配置。dataSource(DruidDataSource druidDataSource
是把生成的druidDataSource
对象使用DataSourceProxy
进行代理。
再看 seata 配置类SeataConfiguration
:
/**
* The type Fescar configuration.
*/
@Configuration
public class SeataConfiguration {
@Value("${spring.application.name}")
private String applicationId;
/**
* 注册一个StatViewServlet
*
* @return global transaction scanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
GlobalTransactionScanner globalTransactionScanner = new GlobalTransactionScanner(applicationId,
"my_test_tx_group");
return globalTransactionScanner;
}
}
这里配置了一个GlobalTransactionScanner
,用来扫描@GlobalTransaction
注解。
现在来简单介绍下业务,我们需要进行资产分配,会根据指定资产 Id 修改资产计划记录的状态,然后调用assetService.increase()
的 dubbo 服务,对资产的金额进行 + 1操作。
接下来直接看关键代码AssignServiceImpl
:
/**
* The type Assign service.
*/
@Service
public class AssignServiceImpl implements AssignService {
private static final Logger LOGGER = LoggerFactory.getLogger(AssignServiceImpl.class);
@Autowired
private AssignRepository assignRepository;
@Reference(check = false)
private io.seata.samples.springboot.service.AssetService assetService;
@Override
@Transactional
@GlobalTransactional
public AssetAssign increaseAmount(String id) {
LOGGER.info("Assign Service Begin ... xid: " + RootContext.getXID() + "\n");
// 此处受 @Transactional 管理,故 seata 并未考虑它的事务
AssetAssign assetAssign = assignRepository.findById(id).get();
assetAssign.setStatus("2");
assignRepository.save(assetAssign);
// remote call asset service
assetService.increase();
return assetAssign;
}
}
这里同时使用了@Transactional
和@GlobalTransactional
两个注解,第一个注解是开启了对increaseAmount
方法操作的本地事务,后者开启了 seata 管理的全局事务。需要注意的是,由于increaseAmount
方法操作被@Transactional
管理,故t_asset_assign
表的更新操作,seata 不会去管理它,因为它自己会进行事务的回滚或提交。于是 seata 只需管理assetService.increase()
这个远程调用的分支事务。
远程方法assetService.increase()
:
/**
* The type Asset service.
*/
@Service(interfaceClass = AssetService.class, timeout = 10000)
@Component
public class AssetServiceImpl implements AssetService {
/**
* The constant LOGGER.
*/
public static final Logger LOGGER = LoggerFactory.getLogger(AssetService.class);
/**
* The constant ASSET_ID.
*/
public static final String ASSET_ID = "DF001";
@Autowired
private AssetRepository assetRepository;
@Override
public int increase() {
LOGGER.info("Asset Service Begin ... xid: " + RootContext.getXID() + "\n");
Asset asset = assetRepository.findById(ASSET_ID).get();
asset.setAmount(asset.getAmount().add(new BigDecimal("1")));
assetRepository.save(asset);
throw new RuntimeException("test exception for seata, your transaction should be rollbacked,asset=" + asset);
}
}
这里Asset
更新完之后,直接抛出RuntimeException
模拟了事务回滚。
在执行程序之前,我们先看数据库中的数据t_asset_assign
表:
t_asset
表:直接运行springboot
模块的启动类SeataSpringbootApp
,然后在浏览器上访问:http://localhost:9999/demo/asset/assign,看输出结果:
RuntimeException: test exception for seata, your transaction should be rollbacked,
asset=Asset{id='DF001', amount=2, voucherCode='e2d1c4512d554db9ae4a5f30cbc2e4b1'}
异常信息已经显示出来了,说明程序已经运行到最后一行了。然后看数据库的数据有没有改变,t_asset_assign
表:
t_asset
表:发现数据并没有变化,说明分布式事务起作用了。到了这里,小伙伴们是不是很 exciting,只需要简单的配置,seata 就让程序像执行本地事务一样管理分布式事务,最主要的是对业务代码 0 侵入,这对喜欢偷懒的程序猿来说,简直就是福音呀。
seata 实现机制
接下来我们就要步入本篇文章的主题了,那就是 seata 是如何实现分布式事务的?
我们发现,我们就仅仅增加了seata 的@GlobalTransactional
注解,就实现了分布式事务。其实 seata 增加了个拦截器来专门处理被@GlobalTransactional
注解的方法,即GlobalTransactionalInterceptor
,其分布式事务的执行流程都在这里完成的:
/**
* The type Global transactional interceptor.
*/
public class GlobalTransactionalInterceptor implements MethodInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);
private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();
private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
private final GlobalLockTemplate<Object> globalLockTemplate = new GlobalLockTemplate<>();
private final FailureHandler failureHandler;
/**
* Instantiates a new Global transactional interceptor.
*
* @param failureHandler the failure handler
*/
public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
if (null == failureHandler) {
failureHandler = DEFAULT_FAIL_HANDLER;
}
this.failureHandler = failureHandler;
}
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}
private Object handleGlobalLock(final MethodInvocation methodInvocation) throws Exception {
return globalLockTemplate.execute(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
return methodInvocation.proceed();
} catch (Throwable e) {
if (e instanceof Exception) {
throw (Exception)e;
} else {
throw new RuntimeException(e);
}
}
}
});
}
private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
@Override
public int timeout() {
return globalTrxAnno.timeoutMills();
}
@Override
public String name() {
String name = globalTrxAnno.name();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod());
}
});
} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
throw e.getCause();
default:
throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);
}
}
}
private <T extends Annotation> T getAnnotation(Method method, Class<T> clazz) {
if (method == null) {
return null;
}
return method.getAnnotation(clazz);
}
private String formatMethod(Method method) {
String paramTypes = Arrays.stream(method.getParameterTypes())
.map(Class::getName)
.reduce((p1, p2) -> String.format("%s, %s", p1, p2))
.orElse("");
return method.getName() + "(" + paramTypes + ")";
}
}
主要逻辑是在TransactionalTemplate#execute
方法:
public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
// 2. begin transaction
try {
triggerBeforeBegin();
tx.begin(business.timeout(), business.name());
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
Object rs = null;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3. any business exception, rollback.
try {
triggerBeforeRollback();
tx.rollback();
triggerAfterRollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
} catch (TransactionException txe) {
// 3.2 Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, ex);
}
}
// 4. everything is fine, commit.
try {
triggerBeforeCommit();
tx.commit();
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}
大致的执行流程为:
- 获取全局事务信息:先从
ThreadLocal
中获取,如果没有则创建一个DefaultGlobalTransaction
。 - 开启全局事务
tx.begin(business.timeout(), business.name())
:通过DefaultTransactionManager
的 begin 方法开启全局事务。DefaultTransactionManager
负责 TM 与 TC 通讯,发送begin、commit、rollback指令。TC 接收到 TM 发过来的 begin 指令后,会返回一个全局唯一的 XID 给 TM。 - 执行业务代码
business.execute()
:在每个本地事务中,会生成分支事务标识 BranchId, 然后根据业务 SQL 执行前后的镜像,生成 undoLog,并随着业务 SQL 一起提交。 - 全局事务回滚
tx.rollback()
:当业务代码执行过程中抛出任何异常,都会进行全局事务的回滚操作。根据 XID 和 BranchId 查找 undoLog,然后反向生成业务 SQL,接着执行该 SQL,并且删除 undoLog 记录。 - 全局事务提交
tx.commit()
:当业务代码执行正常时,则会提交全局事务。分支事务此时已经完成提交,只需要删除 undoLog 即可。
文章最后,我们对 TC、TM、RM之间交互流程,在官方图片的基础上,做一些总结:
至此,TransactionalTemplate#execute
大致执行流程已经讲完,由于篇幅问题,我没有深入探讨,后面我会专门出一篇文章来细讲 TM 的工作流程,尽请期待。