深度剖析Seata源码
@[toc]
# 写在文章开头
本文将针对seata分布式事务注册到提交回滚的全流程进行深入分析和讲解,希望对你有帮助。
我是 SharkChili ,Java 开发者,Java Guide 开源项目维护者。欢迎关注我的公众号:写代码的SharkChili,也欢迎您了解我的开源项目 mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)。
为方便与读者交流,现已创建读者群。关注上方公众号获取我的联系方式,添加时备注加群即可加入。
# 如何使用源码
需要了解的是,这篇文章是基于笔者相对早期的项目作为样例进行讲解,所以对应的seata版本为1.4.2(核心部分实现大体是一样的),建议读者阅读本文在调试源码时可以选择和笔者相同的版本进行理解学习,对应的下载地址为:https://github.com/apache/incubator-seata/tree/v1.4.2 (opens new window)
完成下载后,为保证编译可以通过我们还需要将seata-serializer-protobuf模块移除掉,该模块的位置如下图所示:

同时seata的启动类位于seata-server模块,所以我们需要将该模块的registry.conf的配置改为自己的配置:

以笔者为例,seata配置都是通过nacos进行统一管理的,所以对应的配置类型也都是针对nacos维度去协调适配,大体配置如下所示:
registry {
# 将seata注册到nacos上
type = "nacos"
nacos {
# nacos地址
serverAddr = "ip:8848"
# 命名空间id
namespace = "7c1cfd88-15e4-437d-8e82-2d22d034f447"
# 组名
group = "DEFAULT_GROUP"
# 集群节点名称
cluster = "default"
}
}
config {
# 通过nacos获取配置
type = "nacos"
nacos {
serverAddr = "ip:8848"
namespace = "7c1cfd88-15e4-437d-8e82-2d22d034f447"
group = "DEFAULT_GROUP"
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
经过这几个步骤后seata就可以像我们日常一样的方式进行使用了。
# 基于AT模式详解Seata全链路流程
# seata服务端启动
我们先从seata的服务端启动开始,seata服务端启动时会进行如下几个核心步骤:
- 创建工作线程池
workingThreads。 - 基于工作线程池创建一个
Netty服务端对外提供服务。 - 基于该服务端创建的一个默认的协调者
DefaultCoordinator管理全局事务。 - 默认协调者初始化几个定时任务处理一些异步的全局事务提交、回滚、超时监测的任务。

对应的我们给出这块逻辑的核心入口代码,即位于Server的主函数入口的main方法,可以看到seata服务端的创建是基于netty完成的,完成创建和初始化之后就与协调者coordinator进行绑定:
public static void main(String[] args) throws IOException {
//......
//创建工作线程池处理业务请求
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
//基于该线程池初始化 seata 服务端
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
//......
//log store mode : file, db, redis
SessionHolder.init(parameterParser.getStoreMode());
//初始化协调者,处理seata服务端收到的各种事务读写请求
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
//初始化各种异步定时任务:全局事务提交、全局事务回滚、超时监测等
coordinator.init();
//将协调者作为seata服务端的处理器
nettyRemotingServer.setHandler(coordinator);
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
对应的我们也给出默认协调者的初始化源码,即DefaultCoordinator的init方法,可以看到这段代码本质上就是提交一些定时任务处理全局事务提交、回滚、超时监测、undo log删除等:
public void init() {
//每秒执行,处理需要回滚的分布式事务
retryRollbacking.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.retryRollbackingLock();
if (lock) {
try {
handleRetryRollbacking();
} catch (Exception e) {
LOGGER.info("Exception retry rollbacking ... ", e);
} finally {
SessionHolder.unRetryRollbackingLock();
}
}
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
//......
//异步定时提交全局事务的定时任务,每秒执行一次
asyncCommitting.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.asyncCommittingLock();
if (lock) {
try {
//扫描获取各种异步提交的全局事务
handleAsyncCommitting();
} catch (Exception e) {
LOGGER.info("Exception async committing ... ", e);
} finally {
SessionHolder.unAsyncCommittingLock();
}
}
}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 本地服务如何基于GlobalTransaction注解开启事务
我们都知道seata也是基于spring boot实现的,所以我们可以大胆的认为应用端使用GlobalTransaction开启分布式事务本质上也是和spring boot自动装配有着一定的联系。
所以我们从seata-spring-boot-starter这个脚手架的源码包的spring.factories文件入手,可以看到一个SeataAutoConfiguration的注入:

于是我们就可以看到一个GlobalTransactionScanner即一个关于GlobalTransaction注解扫描的类:
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
//......
//扫描我们的配置文件中配置的applicationId、txServiceGroup对应的事务
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}
2
3
4
5
6
7
8
查看GlobalTransactionScanner源码我们可以看到该类型继承了spring的初始化bean并设置属性后的拓展点InitializingBean的afterPropertiesSet方法,该方法内部会初始化当前seata客户端,分别初始化TM客户端(使用GlobalTransaction注解的方法的服务即做为TM)和RM客户端处理其他TM或者RM服务端发送的消息,它们初始化的工作分别是:
TM客户端会注册各种TC消息响应的处理器,处理各种seata server对应的TC响应的消息,例如:全局事务开启结果处理器、全局事务提交处理器、全局事务回滚处理器等。RM客户端则是注册一些各种seata server对应TC请求消息的处理器,例如:分支事务提交、分支事务回滚、分支事务undo.log删除等。

对应我们给出GlobalTransactionScanner的afterPropertiesSet源码可以看到客户端初始化这段调用的入口,可以看到启动时某个线程完成CAS上锁初始化标识之后,即通过initClient初始化客户端:
@Override
public void afterPropertiesSet() {
//......
//基于扩展点进行客户端初始化
if (initialized.compareAndSet(false, true)) {
initClient();
}
}
2
3
4
5
6
7
8
步入后即可看到对于TM和RM客户端的初始化调用:
private void initClient() {
//......
// 初始化TM客户端
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
//......
// 初始化RM客户端
RMClient.init(applicationId, txServiceGroup);
//......
}
2
3
4
5
6
7
8
9
此时我们先看看TM客户端内部的处理函数即位于TmNettyRemotingClient的registerProcessor即可看到上述所说的TC响应消息处理器的绑定步骤,即:
- 注册TC响应消息处理器
- 注册全局事务开启响应处理器
- 注册全局事务提交响应处理器
- 注册心跳消息处理器
private void registerProcessor() {
// 1.registry TC response processor 注册一些TC响应消息的处理器
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
//全局事务开启结果响应处理器
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
//全局事务提交响应处理器
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
// 2. 注册心跳消息
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
同理我们也给出RM客户端内部初始化的调用RmNettyRemotingClient的registerProcessor方法:
- 注册分支事务提交消息处理器
- 注册rm客户端对应的分支事务提及和回滚处理器
- 注册undo Log删除处理器
- 注册TC响应消息处理器
- 注册心跳处理器
private void registerProcessor() {
// 1. 注册分支事务提交消息处理器
RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
// 2.注册rm客户端对应的分支事务回滚处理器
RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
// 3. 注册undo log删除处理器
RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
// 4. 注册TC响应消息处理器
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
// 5.注册心跳消息处理器
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
同时GlobalTransactionScanner继承了AbstractAutoProxyCreator的wrapIfNecessary,该代理类会在spring容器中的bean进行检查并决定是否进行动态代理。以我们的GlobalTransactionScanner逻辑它本质上就是:
- 检查当前
bean是否有GlobalTransactional这个注解 - 如果有则基于全局事务拦截器对其进行增强
对应核心逻辑如下所示,可以看到这段代码会通过existsAnnotation检查当前bean是否存在GlobalTransactional注解,如果有则基于globalTransactionalInterceptor 对其进行增强:
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
try {
synchronized (PROXYED_SET) {
//......
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//......
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
//判断是否有GlobalTransaction注解,如果有则为其生成分布式事务的动态代理
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
//如果拦截器为空则初始化拦截器
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
//......
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
//基于上一步的interceptor为其生成动态代理
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
//......
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
这也就意味着我们调用带有GlobalTransactional注解方法时,就会走到GlobalTransactionalInterceptor的增强逻辑上,它会走到GlobalTransactionalInterceptor的invoke方法上,最终会走到事务模板类transactionalTemplate的execute方法,该方法会执行如下三个核心步骤:
- 开启全局事务。
- 执行原始业务逻辑。
- 根据各个分支事务结果提交或者回滚事务。

对应的我们给出GlobalTransactionalInterceptor的invoke方法,可以看到当该方法认为注解存在的情况下会直接调用handleGlobalTransaction开启并处理全局事务:
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
//......
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
//获取GlobalTransactional注解信息
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
//......
if (!localDisable) {
//若全局事务注解不为空则调用handleGlobalTransaction处理全局事务
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
//......
}
}
}
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
步入其内部就会走到transactionalTemplate的execute方法,即可看到对于:
- 分支事务的创建
- 告知TC请求开启全局事务
- 执行本地事务
- 全局提交或者回滚
对应逻辑的源码如下所示,读者可结合说明了解:
public Object execute(TransactionalExecutor business) throws Throwable {
//......
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
//如果tx为空则以全局事务启动者的身份创建一个全新的事务
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
//向TC发送请求开启全局事务
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business
//执行业务逻辑(被代理的原始方法)
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
//全局事务回滚
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
//分支事务执行成功,提交全局事务
commitTransaction(tx);
return rs;
} finally {
//......
}
} finally {
//......
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 客户端如何开启分布式事务
上文调用分布式事务的方法时内部会走到的代理的transactionalTemplate的execute方法,其内部有个beginTransaction就是开启分布式事务的关键,由上文可知作为GlobalTransactional注解的方法对对应的服务就是作为TM即transaction manager,所以在调用beginTransaction时,这个方法的代理就会以TM的身份发送一个请求告知TC自己要开启一个全局事务,TC经过自己的协调处理后(后文会介绍流程)返回一份xid告知TM开启成功:

对应的我们查看seata客户端对应TransactionalTemplate的beginTransaction方法即可看到begin方法的调用,该方法回告知seata服务端自己要开启一个全局事务:
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
//......
//开始分布式事务
tx.begin(txInfo.getTimeOut(), txInfo.getName());
//......
} catch (TransactionException txe) {
//......
}
}
2
3
4
5
6
7
8
9
10
11
查看begin内部就是通过TM发起请求,得到xid并缓存到当前线程内部,开始后续的执行流程分布式事务处理流程:
@Override
public void begin(int timeout, String name) throws TransactionException {
//......
//通过TM告知TC开启全局事务,从而得到xid
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
//将xid缓存到当前线程的缓存中
RootContext.bind(xid);
//......
}
2
3
4
5
6
7
8
9
10
# seata服务端如何注册全局事务
基于上述请求,对应seata server端的TC收到请求后会基于传参中的消息标信息,定位到对应的执行器即TM消息处理器,然后驱动TM处理器将这个请求生成一份全局session信息从而构成本次请求的全局事务信息,再将请求写入数据表中:

我们给出TC处理消息的代码入口AbstractNettyRemotingServer的channelRead方法,从名字不难看出TC服务端也是基于netty实现,其内部通过processMessage处理各种消息:
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
//基于netty编写的服务端,channelRead通过processMessage处理客户端各种请求
processMessage(ctx, (RpcMessage) msg);
}
2
3
4
5
6
步入processMessage即可看到基于处理表定位消息并交由处理器处理消息逻辑pair.getFirst().process(ctx, rpcMessage);:
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
//......
//获取网络消息
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
//通过处理表定位到对应的处理器
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
if (pair.getSecond() != null) {
try {
pair.getSecond().execute(() -> {
try {
//基于第一个处理器处理当前消息
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
//......
} finally {
//......
}
});
} catch (RejectedExecutionException e) {
//......
}
} else {
//......
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
因为我们的消息是TM发来的,所以上一步的处理器是ServerOnRequestProcessor的:
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (ChannelManager.isRegistered(ctx.channel())) {
//处理TM客户端发送来的消息
onRequestMessage(ctx, rpcMessage);
} else {
//......
}
}
2
3
4
5
6
7
8
9
最终走到GlobalBeginRequest这个工具的handle基于协调者将事务信息写入global_table从而得到xid返回给TM客户端:
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
//生成全局事务信息并得到xid将数据写入响应返回给TM
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
//.......
}
2
3
4
5
6
7
8
# RM和TC如何协调处理分支事务
完成全局事务的注册管理之后,我们再来聊聊各个分支事务的执行和提交回滚,上文提及,seata原生我们本地的jdbc数据库连接通过代理加以封装,所以在我们seata客户端执行本地事务完成后提交的commit方法是经过了seata的代理这一层,该连接代理在调用commit方法时,其内部就会通过RM向TC注册一个分支事务的请求,TC收到请求后会执行如下工作:
- 基于
lock_table尝试为事务生成全局锁。 - 分支事务信息写入到
branch_table表上并返回branch_id给RM:

我们给出ConnectionProxy的commit方法入口,其内部调用了一个doCommit方法,它就是事务提交的核心逻辑:
@Override
public void commit() throws SQLException {
try {
//excute会调用doCommit生成undoLog缓存和执行分支事务
LOCK_RETRY_POLICY.execute(() -> {
//excuete执行成功后这一步会注册分支事务并提交本地事务和undoLog镜像以保证原子性
doCommit();
return null;
});
} catch (SQLException e) {
//......
} catch (Exception e) {
throw new SQLException(e);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
其内部调用ConnectionProxy的doCommit会调用processGlobalTransactionCommit执行分支事务:
private void doCommit() throws SQLException {
//如果处于全局事务中则调用processGlobalTransactionCommit
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
//......
} else {
//......
}
}
2
3
4
5
6
7
8
9
10
最终就可以在processGlobalTransactionCommit看到如下逻辑:
register这个注册分支事务的逻辑,TC基于RM给定的resourceId信息,生成操作数据的全局锁,并插入分支事务信息到brach_table中。undo日志刷盘到本地undo日志中。- 本地业务的事务提交。
private void processGlobalTransactionCommit() throws SQLException {
try {
//向TC发起请求注册分支事务,TC基于RM给定的resourceId生成全局锁并插入分支事务信息到brach_table后就不会抛异常
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
//undo日志刷盘
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
//本地事务提交
targetConnection.commit();
} catch (Throwable ex) {
//......
}
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
这里我们着重看一下register函数,其内部本质上就是通过RM客户端告知TC自己准备执行分支事务提交,帮我上一把全局锁并注册分支事务:
private void register() throws TransactionException {
if (!context.hasUndoLog() || !context.hasLockKey()) {
return;
}
//向tc发起请求并获得register
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), null, context.buildLockKeys());
//缓存到当前线程中
context.setBranchId(branchId);
}
2
3
4
5
6
7
8
9
10
最后这个注册的逻辑就会来到AbstractResourceManager的branchRegister上,可以看到它会携带着全局事务id和主键等数据发送请求给TC:
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
try {
BranchRegisterRequest request = new BranchRegisterRequest();
//传入全局事务id即xid
request.setXid(xid);
//基于当前数据主键生成lockkeys
request.setLockKey(lockKeys);
request.setResourceId(resourceId);
request.setBranchType(branchType);
request.setApplicationData(applicationData);
//基于RM的netty客户端将其异步发送
BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
//......
return response.getBranchId();
} catch (TimeoutException toe) {
//......
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# seata服务端处理分支事务请求
TC处理流程与上述文章同理,收到消息后基于request中的消息表定位到对应的处理器,我们这里最终会走到BranchRegisterRequest的处理器上,通过AbstractTCInboundHandler注册分支事务:
@Override
public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
BranchRegisterResponse response = new BranchRegisterResponse();
exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
@Override
public void execute(BranchRegisterRequest request, BranchRegisterResponse response)
throws TransactionException {
try {
//tc注册分支事务入口
doBranchRegister(request, response, rpcContext);
} catch (StoreException e) {
//......
}
}
}, request, response);
return response;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
最终这段逻辑就会走到AbstractCore的branchRegister,大体执行的步骤是:
- 生成分支事务
session - 尝试获得数据全局锁
lock_table - 取锁成功将分支事务信息写入
branch_table - 返回
branch_id给RM

对应源码逻辑如下,大体逻辑就说基于分支事务session生成全局锁存到lock_table后,将分支事务信息存到branch_table中:
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
String applicationData, String lockKeys) throws TransactionException {
//......
return SessionHolder.lockAndExecute(globalSession, () -> {
//......
//获取分支事务的表信息并将其写入到lock_table中意味获得全局锁,上锁失败会抛异常
branchSessionLock(globalSession, branchSession);
try {
//添加分支事务信息到branch_table中
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
//......
}
//......
//返回分支事务id
return branchSession.getBranchId();
});
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
TC返回成功后,RM就会执行undo日志刷盘和本地事务提交,详情参考我们本节代码processGlobalTransactionCommit方法,这里不贴出了。
# RM生成回滚日志
对于java程序而言大部分SQL操作底层都是基于Executor执行器操作的,在上述代理执行commit方法前,seata底层将代理的连接即上文的connectionProxy通过AbstractDMLBaseExecutor执行SQL操作,该执会针对我们的连接代理进行如下逻辑处理:
- 判断连接代理
connectionProxy是否是自动提交,若非自动提交则调用executeAutoCommitFalse方法,该方法会生成undoLog数据写入缓存,然后将RM当执行分支事务SQL,基于该执行结果生成后置镜像,最后将undo日志写入undo_log表中。 - 若开启自动提交则关闭自动提交后,复用
executeAutoCommitFalse方法执行系统的undoLog和分支事务SQL的执行操作。
对应源码的整体工作链路图如下所示:

这里我们直接给出AbstractDMLBaseExecutor的doExecute方法作为入口,可以看到若开启自动提交则调用executeAutoCommitTrue,反之调用executeAutoCommitFalse:
@Override
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
//若自动提交则关闭自动提交,并生成undo信息存入缓冲区
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args);
} else {
//直接生成undo log镜像写入缓存
return executeAutoCommitFalse(args);
}
}
2
3
4
5
6
7
8
9
10
11
因为都会复用executeAutoCommitFalse这段逻辑,所以我们直接查看这个方法的逻辑,可以看到该逻辑内部会基于分支事务前后的数据生成前置和后置镜像:
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
throw new NotSupportYetException("multi pk only support mysql!");
}
//基于分支事务的SQL定位操作前的SQL生成前置镜像
TableRecords beforeImage = beforeImage();
//执行分支事务的SQL
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
//生成分支事务操作后置镜像
TableRecords afterImage = afterImage(beforeImage);
//将undoLog写入缓冲区
prepareUndoLog(beforeImage, afterImage);
return result;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 事务全局提交与回滚
TransactionalTemplate(即TM)驱动各种分支事务准备成功后,就会执行commitTransaction提交全局事务,对应的代码位于TransactionalTemplate的execute方法,该方法会通知TC驱动全局事务提交,而TC收到该请求之后,就会驱动各个分支事务提交事务,每个分支事务收到该请求后就会删除undoLog并提交各自未提交的事务:

public Object execute(TransactionalExecutor business) throws Throwable {
//......
try {
//向TC发送请求开启全局事务
beginTransaction(txInfo, tx);
Object rs;
try {
//执行业务逻辑(被代理的原始方法)
rs = business.execute();
} catch (Throwable ex) {
//全局事务回滚
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
//分支事务执行成功,提交全局事务
commitTransaction(tx);
return rs;
} finally {
//......
}
} finally {
//......
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
步入其内部可以看到DefaultGlobalTransaction调用transactionManager即TM提交全局事务:
@Override
public void commit() throws TransactionException {
//......
try {
while (retry > 0) {
try {
//执行全局事务提交
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
//......
}
} finally {
//......
}
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
这个commit的逻辑也很简单,即告知TC要提交全局事务了:
@Override
public GlobalStatus commit(String xid) throws TransactionException {
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
//通知TC提交全局事务
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}
2
3
4
5
6
7
8
对应的TC收到该请求后,对应的AbstractTCInboundHandler就会调用doGlobalCommit通知各个RM提交全局事务:
@Override
public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
GlobalCommitResponse response = new GlobalCommitResponse();
response.setGlobalStatus(GlobalStatus.Committing);
exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
@Override
public void execute(GlobalCommitRequest request, GlobalCommitResponse response)
throws TransactionException {
try {
//遍历RM提交各个分支事务
doGlobalCommit(request, response, rpcContext);
} catch (StoreException e) {
//......
}
}
//......
//......
}, request, response);
return response;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
对应的我们可以来道该源码内部的DefaultCore的doGlobalCommit方法印证这一点,可以看到该方法会遍历各个分支事务调用branchCommit通知其提交或者回滚事务:
@Override
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
//......
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
} else {
//遍历全局事务中的分支事务
Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
//......
}
try {
//告知RM提交事务
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
//......
} catch (Exception ex) {
//......
}
return CONTINUE;
});
//......
}
//......
return success;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
最后请求达到RM上的DefaultRMHandler按照TC要求提交或者回滚事务:
//RM提交分支事务
@Override
public BranchCommitResponse handle(BranchCommitRequest request) {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
return getRMHandler(request.getBranchType()).handle(request);
}
//RM回滚分支事务
@Override
public BranchRollbackResponse handle(BranchRollbackRequest request) {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
return getRMHandler(request.getBranchType()).handle(request);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
提交事务本质上就是提交后删除undoLog即可,这里我们以分支事务回滚为例,可以看到上述代码BranchRollbackResponse 会调用handle方法执行分支事务回滚,该方法最终会走到AbstractRMHandler的doBranchRollback,该方法会调动RM管理器将分支事务回滚:
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
throws TransactionException {
//......
//回滚分支事务
BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
applicationData);
//将xid和处理结果状态响应给TC
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
//......
}
2
3
4
5
6
7
8
9
10
11
12
最终该方法内部就会调用AbstractUndoLogManager的undo解析当前分支事务的前置镜像数据,该方法内部执行逻辑为:
- 定位分支事务的undo日志数据
- 反序列化为undo对象
- 基于该undo对象信息解析出表名、列以及数据等信息。
- 通过undoExecutor 执行器将分支事务还原。
对应源码如下:
@Override
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
//......
for (; ; ) {
try {
//......
// Find UNDO LOG
//获取当前分支事务的undo镜像
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();
boolean exists = false;
while (rs.next()) {
//......
//获取undo数据
byte[] rollbackInfo = getRollbackInfo(rs);
//反序列化生成undo对象 branchUndoLog
String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
: UndoLogParserFactory.getInstance(serializer);
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
try {
// put serializer name to local
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
//遍历undo对象生成SQL还原分支事务值
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
//获取表的表名、列的元信息
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
sqlUndoLog.setTableMeta(tableMeta);
//获取对应的执行执行器 将对应分支事务的表数据回滚
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
dataSourceProxy.getDbType(), sqlUndoLog);
undoExecutor.executeOn(conn);
}
} finally {
// remove serializer name
removeCurrentSerializer();
}
}
//......
} catch (SQLIntegrityConstraintViolationException e) {
//......
} catch (Throwable e) {
//......
} finally {
//......
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# 小结
让我们来做个小结,总的来说seata实现数据库的AT模式分布式事务的流程为:
调用带有
globalTransactional注解的方法执行业务逻辑。该方法以
TM的身份通知TC开启全局事务。TC收到通知后到global_table创建该方法的全局事务信息,这里以笔者某个下单业务的分布式事务场景为例,对应的数据如下所示:
RM开始工作,各自找TC注册分支事务,基于当前数据生成全局锁存入
lock_table,保证当前数据操作时没有其他事务干扰:

全局锁成功后TC将数据存入branch_table表,对应数据如下所示:

- RM完成分支事务注册后,持有本地锁的事务执行本地分支事务,成功后将生成分支数据的前后镜像undo表,如下所示:
这里我们以后置镜像为例子查看账户表修改后的字段值为例,可以看到该镜像将每一个字段的类型、值等信息都序列化为JSON生成undo镜像:

- TM感知到所有分支事务准备成功,通知TC将这些RM(分支事务)提交,即将undoLog删除,反之基于undoLog将数据回滚。
对应我们给出下面这段图,读者可以结合上面源码梳理一下全流程:

我是 SharkChili ,Java 开发者,Java Guide 开源项目维护者。欢迎关注我的公众号:写代码的SharkChili,也欢迎您了解我的开源项目 mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)。
为方便与读者交流,现已创建读者群。关注上方公众号获取我的联系方式,添加时备注加群即可加入。
# 参考
Seata AT 模式:https://seata.apache.org/zh-cn/docs/dev/mode/at-mode/ (opens new window)
深入解析Spring Bean的生命周期管理 :https://sharkchili.blog.csdn.net/article/details/120857166 (opens new window)