1. 三种角色
- tc,运行于seata-server,协调事务。每次收到开启事务请求,生成一个xid。
- tm,运行于业务项目,向tc申请开启事务,获得xid。
- rm,运行于业务项目,参与事务,向tc注册分支事务。
2. 一次正常commit流程
2.1. tm向tc申请开启事务、执行业务代码、向tc申请提交事务
- 调用带注解的方法,进入切面
io.seata.tm.api.TransactionalTemplate.execute
-io.seata.tm.api.TransactionalTemplate.beginTransaction # 获取一个xid,然后放到上下文中
--io.seata.tm.api.DefaultGlobalTransaction.begin(int, java.lang.String)
---io.seata.tm.DefaultTransactionManager.begin # 申请开启事务的请求消息GlobalBeginRequest,响应消息GlobalBeginResponse,可以看到response包含的xid
----io.seata.tm.DefaultTransactionManager.syncCall
-----io.seata.core.rpc.netty.AbstractNettyRemotingClient.sendSyncRequest(java.lang.Object)
------io.seata.core.rpc.netty.AbstractNettyRemoting.sendSync # 使用netty调用seata-server,开启事务,获得一个xid
--RootContext.bind(xid);
-business.execute() #执行业务代码
-io.seata.tm.api.TransactionalTemplate.commitTransaction # 提交分布式事务,请求消息GlobalCommitRequest,响应消息GlobalCommitResponse,request包含xid
--io.seata.tm.api.DefaultGlobalTransaction.commit
---io.seata.tm.DefaultTransactionManager.commit # 申请提交分布式事务
----io.seata.tm.DefaultTransactionManager.syncCall
-----io.seata.core.rpc.netty.AbstractNettyRemotingClient.sendSyncRequest(java.lang.Object)
------io.seata.core.rpc.netty.AbstractNettyRemoting.sendSync # 使用netty调用seata-server
2.2 tm给下游组件传递xid
- 使用feign调用其他组件,将xid添加到http-header,key为TX_XID
com.alibaba.cloud.seata.feign.SeataFeignClient.execute
-com.alibaba.cloud.seata.feign.SeataFeignClient.getModifyRequest
openfeign使用这种方式传递xid
RestTemplate使用SeataRestTemplateInterceptor
dubbo使用ApacheDubboTransactionPropagationFilter、AlibabaDubboTransactionPropagationFilter
2.3. rm组件获取http-header中的TX_XID
com.alibaba.cloud.seata.web.SeataHandlerInterceptor.preHandle
-RootContext.bind(rpcXid) #绑定到上下文
2.4. rm执行数据库操作
io.seata.rm.datasource.PreparedStatementProxy.execute
-io.seata.rm.datasource.exec.ExecuteTemplate.execute(io.seata.rm.datasource.StatementProxy<S>, io.seata.rm.datasource.exec.StatementCallback<T,S>, java.lang.Object...)
--io.seata.rm.datasource.exec.ExecuteTemplate.execute(java.util.List<io.seata.sqlparser.SQLRecognizer>, io.seata.rm.datasource.StatementProxy<S>, io.seata.rm.datasource.exec.StatementCallback<T,S>, java.lang.Object...)
---io.seata.rm.datasource.sql.SQLVisitorFactory.get # 获取sql识别器
---获取sql执行器,增删改、select for update
----io.seata.rm.datasource.exec.BaseTransactionalExecutor.execute # 从上下文拿到xid,绑定到Connection
-----io.seata.rm.datasource.exec.AbstractDMLBaseExecutor.doExecute # 执行sql
------io.seata.rm.datasource.exec.AbstractDMLBaseExecutor.executeAutoCommitFalse
-------io.seata.rm.datasource.exec.AbstractDMLBaseExecutor.beforeImage # 生成操作前镜像。抽象方法,根据增删改,有不同实现。
-------执行sql
-------io.seata.rm.datasource.exec.AbstractDMLBaseExecutor.afterImage # 生成操作前镜像。抽象方法,根据增删改,有不同实现。
-------io.seata.rm.datasource.exec.BaseTransactionalExecutor.prepareUndoLog # 用前后镜像制作undo_log,用于回滚
2.5. rm提交本地事务
org.springframework.jdbc.datasource.DataSourceTransactionManager.doCommit
-io.seata.rm.datasource.ConnectionProxy.commit
--io.seata.rm.datasource.ConnectionProxy.doCommit
---io.seata.rm.datasource.ConnectionProxy.processGlobalTransactionCommit
----io.seata.rm.datasource.ConnectionProxy.register
-----io.seata.rm.DefaultResourceManager.branchRegister
------io.seata.rm.AbstractResourceManager.branchRegister # 向tc注册分支事务,请求消息BranchRegisterRequest,包含xid,lockKeys,resourceId;响应消息BranchRegisterResponse,包含branchId(分支事务id)
----io.seata.rm.datasource.undo.UndoLogManagerFactory.getUndoLogManager
----io.seata.rm.datasource.undo.UndoLogManager.flushUndoLogs # 插入undo_log
----java.sql.Connection.commit # 组件提交本地事务
2.6. rm处理tc的commit消息
io.seata.core.rpc.processor.client.RmBranchCommitProcessor.process
-io.seata.core.rpc.processor.client.RmBranchCommitProcessor.handleBranchCommit # 请求消息BranchCommitRequest,值得注意的是请求消息来自来自tc(seata-server),包含xid,branchId,resourceId。rm处理完成后给tc回复响应消息BranchCommitResponse,包含xid,branchId,BranchStatus。
--io.seata.rm.AbstractRMHandler.onRequest
---io.seata.core.protocol.transaction.BranchCommitRequest.handle
----io.seata.rm.AbstractRMHandler.handle(io.seata.core.protocol.transaction.BranchCommitRequest)
-----io.seata.core.exception.AbstractExceptionHandler.exceptionHandleTemplate
------io.seata.rm.AbstractRMHandler.doBranchCommit
-------io.seata.rm.datasource.DataSourceManager.branchCommit
--------io.seata.rm.datasource.AsyncWorker.branchCommit
---------io.seata.rm.datasource.AsyncWorker.addToCommitQueue # 把要提交的分支事务信息加到一个队列,有定时任务消费该队列。
2.7. rm端定时检查已完成的分支事务的队列,清理undo_log
io.seata.rm.datasource.AsyncWorker.AsyncWorker # 定时调用doBranchCommitSafely,1秒1次
-doBranchCommitSafely
--io.seata.rm.datasource.AsyncWorker.doBranchCommit
---io.seata.rm.datasource.AsyncWorker.dealWithGroupedContexts
----io.seata.rm.datasource.AsyncWorker.deleteUndoLog
-----io.seata.rm.datasource.undo.AbstractUndoLogManager.batchDeleteUndoLog # 清理undo_log
3. 问题
3.1. 为什么带注解@GlobalTransactional的方法能开启分布式事务?
- io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary方法扫描带@GlobalTransactional注解的方法,生成代理。
- 代理方法的拦截器是io.seata.spring.annotation.GlobalTransactionalInterceptor,invoke方法调用handleGlobalTransaction方法,调用TransactionalTemplate.execute
- TransactionalTemplate.execute,在该方法中可以看到
- beginTransaction(txInfo, tx) # tm向tc申请开启事务
- rs = business.execute(); # 执行业务代码
- commitTransaction(tx); # 提交事务
- completeTransactionAfterThrowing(txInfo, tx, ex); # 如果出现异常,执行回滚
3.2. tm开启事务,xid是怎样向下游组件传递的?
调用下游组件时,tm把xid放到了请求中。常见的有http系列和dubbo系列。
3.2.1. http系列
tm把xid放到http-header中,header_name为TX_XID。下游组件从header中获取。
在spring-cloud-starter-alibaba-seata.jar下面有三个包:feign、rest、web
feign对应使用openfeign的tm。
com.alibaba.cloud.seata.feign.SeataFeignClient#getModifyRequest方法中,把xid放到了http_headerrest对应使用RestTemplate的tm。
com.alibaba.cloud.seata.rest.SeataRestTemplateInterceptor#intercept方法中,把xid放到了http_headerweb对应rm
com.alibaba.cloud.seata.web.SeataHandlerInterceptor#preHandle方法中
,获取http_header TX_XID。
3.2.2. dubbo系列
实现了dubbo框架的Filter,获取上游的xid,给下游发送xid,有apache和alibaba两个实现:
- ApacheDubboTransactionPropagationFilter
- AlibabaDubboTransactionPropagationFilter
3.3. rm的数据库操作为什么能产生undo_log?
seata对DataSource、Connection、Statement、PreparedStatement做了代理。
- 使用io.seata.rm.datasource.SeataDataSourceProxy代理DataSource,getConnection方法返回的是io.seata.rm.datasource.AbstractConnectionProxy
- AbstractConnectionProxy#createStatement()返回的是io.seata.rm.datasource.StatementProxy
- AbstractConnectionProxy#prepareStatement()返回的是io.seata.rm.datasource.PreparedStatementProxy
seata-spring-boot-starter - StatementProxy和PreparedStatementProxy执行sql时,调用io.seata.rm.datasource.exec.ExecuteTemplate#execute,该方法内部根据数据库操作生成undo_log
3.4. tc、tm、rm三个角色是什么运行的?
- tc角色运行于seata-server,一般会部署一套seata-server集群。
- tm和rm角色运行于业务组件。
3.5. tm、rm是怎样启动的?怎样和tc通信?
业务组件使用jar包seata-spring-boot-starter.jar装配rm和rm。
也可以不使用seata-spring-boot-starter.jar,自己手工配制seata。但不推荐新手这样做,容易出现疑难杂症。
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration#globalTransactionScanner构造一个GlobalTransactionScanner。
GlobalTransactionScanner#initClient方法调用TMClient.init构造tm,调用RMClient.init构造rm。
-
tm是一个基于netty的tcp client,它从注册中心获取seata-server列表,跟所有的seata-server都建立连接,并保持心跳。它注册了一组io.seata.core.rpc.processor.RemotingProcessor用于处理tc发来的消息。
ClientOnResponseProcessor、ClientHeartbeatProcessor
-
rm跟tm差不多,也是基于netty的tcp client,只是它注册的是另一组RemotingProcessor。
RmBranchCommitProcessor、RmBranchRollbackProcessor、RmUndoLogProcessor、ClientOnResponseProcessor、ClientHeartbeatProcessor
tc是一个基于netty的tcp server,它监听指定的端口,处理tm和rm的连接。它也注册了一组RemotingProcessor。