禅与计算机 禅与计算机
首页
  • Java基础

    • 聊一聊java一些核心知识点
    • 聊聊java面向对象核心知识点
    • 聊聊Java中的异常
    • 聊聊Java中的常用类String
    • 万字长文带你细聊Java注解本质
    • 来聊聊Java的反射机制
    • 深入解析Java泛型的魅力与机制
    • Java集合框架深度解析与面试指南
    • Java常用集合类HashMap深度解析
    • LinkedHashMap源码到面试题的全解析
    • 深入解析CopyOnWriteArrayList的工作机制
    • Java基础IO总结
    • Java三大IO模型小结
    • Java BIO NIO AIO详解
    • Java进阶NIO之IO多路复用详解
    • Java8流式编程入门
    • 一文速通lambda与函数式编程
    • Java8函数式方法引用最佳实践
  • Java并发编程

    • Java并发编程基础小结
    • 深入理解Java中的final关键字
    • 浅谈Java并发安全发布技术
    • 浅谈Java并发编程中断的哲学
    • Java线程池知识点小结
    • 浅谈Java线程池中拒绝策略与流控的艺术
    • synchronized关键字使用指南
    • 深入源码解析synchronized关键字
    • 详解JUC包下的锁
    • 详解并发编程中的CAS原子类
    • LongAdder源码分析
    • AQS源码解析
    • 深入剖析Java并发编程中的死锁问题
    • Java并发容器总结
    • 详解Java并发编程volatile关键字
    • 并发编程ThreadLocal必知必会
    • CompletableFuture基础实践小结
    • CompletableFuture异步多任务最佳实践
    • 硬核详解FutureTask设计与实现
    • 线程池大小设置的底层逻辑与场景化方案
    • 来聊一个有趣的限流器RateLimiter
  • JVM相关

    • 从零开始掌握 JVM
    • JVM核心知识点小结
    • JVM指令集概览:基础与应用
    • JVM类加载器深度解析
    • JVM方法区深度解析
    • Java内存模型JMM详解
    • Java对象大小的精确计算方法
    • 逃逸分析在Java中的应用与优化
    • 从零开始理解JVM的JIT编译机制
    • G1垃圾回收器:原理详解与调优指南
    • JVM故障排查实战指南
    • JVM内存问题排错最佳实践
    • JVM内存溢出排查指南
    • 简明的Arthas使用教程
    • 简明的Arthas配置及基础运维教程
    • 基于Arthas Idea的JVM故障排查与指令生成
    • 基于arthas量化监控诊断java应用方法论与实践
    • 深入剖析arthas技术原理
  • 深入理解Spring框架

    • Spring 核心知识点全面解析
    • Spring核心功能IOC详解
    • Spring AOP 深度剖析与实践
    • Spring 三级缓存机制深度解析
    • 深入 Spring 源码,剖析设计模式的落地实践
    • 探索 Spring 事务的奥秘
    • 深入解析Spring Bean的生命周期管理
    • 解读 Spring Boot 核心知识点
    • Spring Boot 启动优化实战:1分钟到13秒的排查与优化之路
    • Spring Boot自动装配原理及实践
    • 一文快速上手Sharding-JDBC
    • sharding-jdbc如何实现分页查询
    • 基于DynamicDataSource整合分库分表框架Shardingsphere
  • 计算机组成原理

    • 计算机硬件知识小结
    • CPU核心知识点小结
    • 浅谈CPU流水线的艺术
    • 从Java程序员视角聊聊CPU缓存
    • CPU任务调度和伪共享问题小结
    • CPU MESI缓存一致性协议
    • CPU内存管理机制
    • 内存深度解析
    • 磁盘存储原理
    • 详解计算机启动步骤
    • CPU南北桥架构与发展史
    • CPU中断机制与硬件交互详解
  • 操作系统

    • 如何实现一个高性能服务器
    • Linux文件结构与文件权限
    • Linux常见压缩指令小结
    • Linux核心系统调用详解
    • Linux进程管理
    • Linux线程管理
    • 进程与线程深度解析
    • Linux进程间通信机制
    • 零拷贝技术原理与实践
    • CPU缓存一致性问题深度解析
    • IO任务与CPU调度艺术
  • 计算机网络

    • 网卡通信原理详解
    • 网卡数据包处理指南
    • 基于抓包详解TCP协议
  • 编码最佳实践

    • 浅谈现代软件工程TDD最佳实践
    • 浅谈TDD模式下并发程序设计与实现
    • 面向AI编程新范式Trae后端开发环境搭建与实践
    • 基于提示词工程的Redis签到功能开发实践
    • 基于Vibe Coding的Redis分页查询实现
    • 告别AI无效对话:资深工程师的提示词设计最佳实践
  • 实用技巧与配置

    • Mac常用快捷键与效率插件指南
    • Keynote技术科普短视频制作全攻略
  • 写作

    • 写好技术博客的5大核心原则:从认知科学到AI工具的全流程指南
  • 开发工具

    • IDEA配置详解与高效使用指南
  • Nodejs
  • 博客搭建
  • Redis

    • Redis核心知识小结
    • 解锁Redis发布订阅模式
    • 掌握Redis事务
    • Redis主从复制技术
    • Redis的哨兵模式详解
    • 深度剖析Redisson分布式锁
    • 详解redis单线程设计思路
    • 来聊聊Redis所实现的Reactor模型
    • Redis RDB持久化源码深度解析
    • 来聊聊redis的AOF写入
    • 来聊聊Redis持久化AOF管道通信的设计
    • 来聊聊redis集群数据迁移
    • Redis SDS动态字符串深度解析
    • 高效索引的秘密:redis跳表设计与实现
    • 聊聊redis中的字典设计与实现
  • MySQL

    • MySQL基础知识点小结
    • 解读MySQL 索引基础
    • MySQL 索引进阶指南
    • 解读MySQL Explain关键字
    • 探秘 MySQL 锁:原理与实践
    • 详解MySQL重做日志redolog
    • 详解undoLog在MySQL MVCC中的运用
    • MySQL二进制日志binlog核心知识点
    • MySQL高效插入数据的最佳实践
    • MySQL分页查询优化指南
    • MySQL流式查询的奥秘与应用解析
    • 来聊聊分库分表
    • 来聊聊大厂常用的分布式ID生成方案
  • ElasticSearch

    • 从Lucene到Elasticsearch:进化之路
    • ES 基础使用指南
    • ElasticSearch如何写入一篇文档
    • 深入剖析Elasticsearch文档读取原理
    • 聊聊ElasticSearch性能调优
    • Spring借助Easy-Es操作ES
  • Netty

    • 一文快速了解高性能网络通信框架Netty
    • Netty网络传输简记
    • 来聊聊Netty的ByteBuf
    • 来聊聊Netty消息发送的那些事
    • 解密Netty高性能之谜:NioEventLoop线程池阻塞分析
    • 详解Netty中的责任链Pipeline如何管理ChannelHandler
    • Netty Reactor模型常见知识点小结
    • Netty如何驾驭TCP流式传输?粘包拆包问题全解
    • Netty解码器源码解析
  • 消息队列

    • 一文快速入门消息队列
    • 消息队列RocketMQ入门指南
    • 基于RocketMQ实现分布式事务
    • RocketMQ容器化最佳实践
    • RocketMQ常见问题与深度解析
    • Kafka快速安装与使用指南
  • Nginx

    • Linux下的nginx安装
    • Nginx基础入门总结
    • Nginx核心指令小结
    • Nginx进程结构与核心模块初探
    • Nginx应用进阶HTTP核心模块配置
    • Nginx缓存及HTTPS配置小记
    • nginx高可用实践简记
    • Nginx性能优化
  • 微服务基础

    • 微服务基础知识小结
    • 分布式事务核心概念小结
    • OpenFeign核心知识小结
    • 微服务组件Gateway核心使用小结
    • 分布式事务Seata实践
    • 用 Docker Compose 完成 Seata 的整合部署
  • Nacos

    • Nacos服务注册原理全解析
    • Nacos服务订阅流程全解析
    • Nacos服务变更推送流程全解析
    • 深入解析SpringCloud负载均衡器Loadbalancer
    • Nacos源码环境搭建与调试指南
  • Seata

    • 深度剖析Seata源码
  • Docker部署

    • 一文快速掌握docker的理念和基本使用
    • 使用docker编排容器
    • 基于docker-compose部署微服务基本环境
    • 基于docker容器化部署微服务
    • Gateway全局异常处理及请求响应监控
    • Docker图形化界面工具Portainer最佳实践
  • Go基础

    • 一文带你速通Go语言基础语法
    • 一文快速掌握Go语言切片
    • 来聊聊go语言的hashMap
    • 一文速通go语言类型系统
    • 浅谈Go语言中的面向对象
    • go语言是如何实现协程的
    • 聊聊go语言中的GMP模型
    • 极简的go语言channel入门
    • 聊聊go语言基于epoll的网络并发实现
    • 写给Java开发的Go语言协程实践
  • mini-redis实战

    • 来聊聊我用go手写redis这件事
    • mini-redis如何解析处理客户端请求
    • 实现mini-redis字符串操作
    • 硬核复刻redis底层双向链表核心实现
    • 动手复刻redis之go语言下的字典的设计与落地
    • Go 语言下的 Redis 跳表设计与实现
    • Go 语言版 Redis 有序集合指令复刻探索
  • 项目编排

    • Spring脚手架创建简记
    • Spring脚手架集成分页插件
    • Spring脚手架集成校验框架
    • maven父子模块两种搭建方式简记
    • SpringBoot+Vue3前后端快速整合入门
    • 来聊聊Java项目分层规范
  • 场景设计

    • Java实现文件分片上传
    • 基于时间缓存优化浏览器轮询阻塞问题
    • 基于EasyExcel实现高效导出
    • 10亿数据高效插入MySQL最佳方案
    • 从开源框架中学习那些实用的位运算技巧
  • CI/CD

    • 基于NETAPP实现内网穿透
    • 基于Gitee实现Jenkins自动化部署SpringBoot项目
    • Jenkins离线安装部署教程简记
    • 基于Nexus搭建Maven私服基础入门
    • 基于内网的Jenkins整合gitlab综合方案简记
  • 监控方法论

    • SpringBoot集成Prometheus与Grafana监控
    • Java监控度量Micrometer全解析
    • 从 micrometer计量器角度快速上手promQL
    • 硬核安利一个监控告警开源项目Nightingale
  • Spring AI

    • Spring AI Alibaba深度实战:一文掌握智能体开发全流程
    • Spring AI Alibaba实战:JVM监控诊断Arthas Agent的工程化构建与最佳实践
  • 大模型评测

    • M2.7 真能打!我用两个真实场景测了测,结果有点意外
    • Qoder JetBrains插件评测:祖传代码重构与接口优化实战
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

sharkchili

计算机禅修者
首页
  • Java基础

    • 聊一聊java一些核心知识点
    • 聊聊java面向对象核心知识点
    • 聊聊Java中的异常
    • 聊聊Java中的常用类String
    • 万字长文带你细聊Java注解本质
    • 来聊聊Java的反射机制
    • 深入解析Java泛型的魅力与机制
    • Java集合框架深度解析与面试指南
    • Java常用集合类HashMap深度解析
    • LinkedHashMap源码到面试题的全解析
    • 深入解析CopyOnWriteArrayList的工作机制
    • Java基础IO总结
    • Java三大IO模型小结
    • Java BIO NIO AIO详解
    • Java进阶NIO之IO多路复用详解
    • Java8流式编程入门
    • 一文速通lambda与函数式编程
    • Java8函数式方法引用最佳实践
  • Java并发编程

    • Java并发编程基础小结
    • 深入理解Java中的final关键字
    • 浅谈Java并发安全发布技术
    • 浅谈Java并发编程中断的哲学
    • Java线程池知识点小结
    • 浅谈Java线程池中拒绝策略与流控的艺术
    • synchronized关键字使用指南
    • 深入源码解析synchronized关键字
    • 详解JUC包下的锁
    • 详解并发编程中的CAS原子类
    • LongAdder源码分析
    • AQS源码解析
    • 深入剖析Java并发编程中的死锁问题
    • Java并发容器总结
    • 详解Java并发编程volatile关键字
    • 并发编程ThreadLocal必知必会
    • CompletableFuture基础实践小结
    • CompletableFuture异步多任务最佳实践
    • 硬核详解FutureTask设计与实现
    • 线程池大小设置的底层逻辑与场景化方案
    • 来聊一个有趣的限流器RateLimiter
  • JVM相关

    • 从零开始掌握 JVM
    • JVM核心知识点小结
    • JVM指令集概览:基础与应用
    • JVM类加载器深度解析
    • JVM方法区深度解析
    • Java内存模型JMM详解
    • Java对象大小的精确计算方法
    • 逃逸分析在Java中的应用与优化
    • 从零开始理解JVM的JIT编译机制
    • G1垃圾回收器:原理详解与调优指南
    • JVM故障排查实战指南
    • JVM内存问题排错最佳实践
    • JVM内存溢出排查指南
    • 简明的Arthas使用教程
    • 简明的Arthas配置及基础运维教程
    • 基于Arthas Idea的JVM故障排查与指令生成
    • 基于arthas量化监控诊断java应用方法论与实践
    • 深入剖析arthas技术原理
  • 深入理解Spring框架

    • Spring 核心知识点全面解析
    • Spring核心功能IOC详解
    • Spring AOP 深度剖析与实践
    • Spring 三级缓存机制深度解析
    • 深入 Spring 源码,剖析设计模式的落地实践
    • 探索 Spring 事务的奥秘
    • 深入解析Spring Bean的生命周期管理
    • 解读 Spring Boot 核心知识点
    • Spring Boot 启动优化实战:1分钟到13秒的排查与优化之路
    • Spring Boot自动装配原理及实践
    • 一文快速上手Sharding-JDBC
    • sharding-jdbc如何实现分页查询
    • 基于DynamicDataSource整合分库分表框架Shardingsphere
  • 计算机组成原理

    • 计算机硬件知识小结
    • CPU核心知识点小结
    • 浅谈CPU流水线的艺术
    • 从Java程序员视角聊聊CPU缓存
    • CPU任务调度和伪共享问题小结
    • CPU MESI缓存一致性协议
    • CPU内存管理机制
    • 内存深度解析
    • 磁盘存储原理
    • 详解计算机启动步骤
    • CPU南北桥架构与发展史
    • CPU中断机制与硬件交互详解
  • 操作系统

    • 如何实现一个高性能服务器
    • Linux文件结构与文件权限
    • Linux常见压缩指令小结
    • Linux核心系统调用详解
    • Linux进程管理
    • Linux线程管理
    • 进程与线程深度解析
    • Linux进程间通信机制
    • 零拷贝技术原理与实践
    • CPU缓存一致性问题深度解析
    • IO任务与CPU调度艺术
  • 计算机网络

    • 网卡通信原理详解
    • 网卡数据包处理指南
    • 基于抓包详解TCP协议
  • 编码最佳实践

    • 浅谈现代软件工程TDD最佳实践
    • 浅谈TDD模式下并发程序设计与实现
    • 面向AI编程新范式Trae后端开发环境搭建与实践
    • 基于提示词工程的Redis签到功能开发实践
    • 基于Vibe Coding的Redis分页查询实现
    • 告别AI无效对话:资深工程师的提示词设计最佳实践
  • 实用技巧与配置

    • Mac常用快捷键与效率插件指南
    • Keynote技术科普短视频制作全攻略
  • 写作

    • 写好技术博客的5大核心原则:从认知科学到AI工具的全流程指南
  • 开发工具

    • IDEA配置详解与高效使用指南
  • Nodejs
  • 博客搭建
  • Redis

    • Redis核心知识小结
    • 解锁Redis发布订阅模式
    • 掌握Redis事务
    • Redis主从复制技术
    • Redis的哨兵模式详解
    • 深度剖析Redisson分布式锁
    • 详解redis单线程设计思路
    • 来聊聊Redis所实现的Reactor模型
    • Redis RDB持久化源码深度解析
    • 来聊聊redis的AOF写入
    • 来聊聊Redis持久化AOF管道通信的设计
    • 来聊聊redis集群数据迁移
    • Redis SDS动态字符串深度解析
    • 高效索引的秘密:redis跳表设计与实现
    • 聊聊redis中的字典设计与实现
  • MySQL

    • MySQL基础知识点小结
    • 解读MySQL 索引基础
    • MySQL 索引进阶指南
    • 解读MySQL Explain关键字
    • 探秘 MySQL 锁:原理与实践
    • 详解MySQL重做日志redolog
    • 详解undoLog在MySQL MVCC中的运用
    • MySQL二进制日志binlog核心知识点
    • MySQL高效插入数据的最佳实践
    • MySQL分页查询优化指南
    • MySQL流式查询的奥秘与应用解析
    • 来聊聊分库分表
    • 来聊聊大厂常用的分布式ID生成方案
  • ElasticSearch

    • 从Lucene到Elasticsearch:进化之路
    • ES 基础使用指南
    • ElasticSearch如何写入一篇文档
    • 深入剖析Elasticsearch文档读取原理
    • 聊聊ElasticSearch性能调优
    • Spring借助Easy-Es操作ES
  • Netty

    • 一文快速了解高性能网络通信框架Netty
    • Netty网络传输简记
    • 来聊聊Netty的ByteBuf
    • 来聊聊Netty消息发送的那些事
    • 解密Netty高性能之谜:NioEventLoop线程池阻塞分析
    • 详解Netty中的责任链Pipeline如何管理ChannelHandler
    • Netty Reactor模型常见知识点小结
    • Netty如何驾驭TCP流式传输?粘包拆包问题全解
    • Netty解码器源码解析
  • 消息队列

    • 一文快速入门消息队列
    • 消息队列RocketMQ入门指南
    • 基于RocketMQ实现分布式事务
    • RocketMQ容器化最佳实践
    • RocketMQ常见问题与深度解析
    • Kafka快速安装与使用指南
  • Nginx

    • Linux下的nginx安装
    • Nginx基础入门总结
    • Nginx核心指令小结
    • Nginx进程结构与核心模块初探
    • Nginx应用进阶HTTP核心模块配置
    • Nginx缓存及HTTPS配置小记
    • nginx高可用实践简记
    • Nginx性能优化
  • 微服务基础

    • 微服务基础知识小结
    • 分布式事务核心概念小结
    • OpenFeign核心知识小结
    • 微服务组件Gateway核心使用小结
    • 分布式事务Seata实践
    • 用 Docker Compose 完成 Seata 的整合部署
  • Nacos

    • Nacos服务注册原理全解析
    • Nacos服务订阅流程全解析
    • Nacos服务变更推送流程全解析
    • 深入解析SpringCloud负载均衡器Loadbalancer
    • Nacos源码环境搭建与调试指南
  • Seata

    • 深度剖析Seata源码
  • Docker部署

    • 一文快速掌握docker的理念和基本使用
    • 使用docker编排容器
    • 基于docker-compose部署微服务基本环境
    • 基于docker容器化部署微服务
    • Gateway全局异常处理及请求响应监控
    • Docker图形化界面工具Portainer最佳实践
  • Go基础

    • 一文带你速通Go语言基础语法
    • 一文快速掌握Go语言切片
    • 来聊聊go语言的hashMap
    • 一文速通go语言类型系统
    • 浅谈Go语言中的面向对象
    • go语言是如何实现协程的
    • 聊聊go语言中的GMP模型
    • 极简的go语言channel入门
    • 聊聊go语言基于epoll的网络并发实现
    • 写给Java开发的Go语言协程实践
  • mini-redis实战

    • 来聊聊我用go手写redis这件事
    • mini-redis如何解析处理客户端请求
    • 实现mini-redis字符串操作
    • 硬核复刻redis底层双向链表核心实现
    • 动手复刻redis之go语言下的字典的设计与落地
    • Go 语言下的 Redis 跳表设计与实现
    • Go 语言版 Redis 有序集合指令复刻探索
  • 项目编排

    • Spring脚手架创建简记
    • Spring脚手架集成分页插件
    • Spring脚手架集成校验框架
    • maven父子模块两种搭建方式简记
    • SpringBoot+Vue3前后端快速整合入门
    • 来聊聊Java项目分层规范
  • 场景设计

    • Java实现文件分片上传
    • 基于时间缓存优化浏览器轮询阻塞问题
    • 基于EasyExcel实现高效导出
    • 10亿数据高效插入MySQL最佳方案
    • 从开源框架中学习那些实用的位运算技巧
  • CI/CD

    • 基于NETAPP实现内网穿透
    • 基于Gitee实现Jenkins自动化部署SpringBoot项目
    • Jenkins离线安装部署教程简记
    • 基于Nexus搭建Maven私服基础入门
    • 基于内网的Jenkins整合gitlab综合方案简记
  • 监控方法论

    • SpringBoot集成Prometheus与Grafana监控
    • Java监控度量Micrometer全解析
    • 从 micrometer计量器角度快速上手promQL
    • 硬核安利一个监控告警开源项目Nightingale
  • Spring AI

    • Spring AI Alibaba深度实战:一文掌握智能体开发全流程
    • Spring AI Alibaba实战:JVM监控诊断Arthas Agent的工程化构建与最佳实践
  • 大模型评测

    • M2.7 真能打!我用两个真实场景测了测,结果有点意外
    • Qoder JetBrains插件评测:祖传代码重构与接口优化实战
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 微服务基础

  • Nacos

  • Seata

    • 深度剖析Seata源码
      • 写在文章开头
      • 如何使用源码
      • 基于AT模式详解Seata全链路流程
        • seata服务端启动
        • 本地服务如何基于GlobalTransaction注解开启事务
        • 客户端如何开启分布式事务
        • seata服务端如何注册全局事务
        • RM和TC如何协调处理分支事务
        • seata服务端处理分支事务请求
        • RM生成回滚日志
        • 事务全局提交与回滚
      • 小结
      • 参考
  • Docker部署

  • 分布式微服务
  • Seata
sharkchili
2026-03-25
目录

深度剖析Seata源码

@[toc]

# 写在文章开头

本文将针对seata分布式事务注册到提交回滚的全流程进行深入分析和讲解,希望对你有帮助。

我是 SharkChili ,Java 开发者,Java Guide 开源项目维护者。欢迎关注我的公众号:写代码的SharkChili,也欢迎您了解我的开源项目 mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)。

为方便与读者交流,现已创建读者群。关注上方公众号获取我的联系方式,添加时备注加群即可加入。

# 如何使用源码

需要了解的是,这篇文章是基于笔者相对早期的项目作为样例进行讲解,所以对应的seata版本为1.4.2(核心部分实现大体是一样的),建议读者阅读本文在调试源码时可以选择和笔者相同的版本进行理解学习,对应的下载地址为:https://github.com/apache/incubator-seata/tree/v1.4.2 (opens new window)

完成下载后,为保证编译可以通过我们还需要将seata-serializer-protobuf模块移除掉,该模块的位置如下图所示:

同时seata的启动类位于seata-server模块,所以我们需要将该模块的registry.conf的配置改为自己的配置:

以笔者为例,seata配置都是通过nacos进行统一管理的,所以对应的配置类型也都是针对nacos维度去协调适配,大体配置如下所示:

registry {
  # 将seata注册到nacos上
  type = "nacos"
  nacos {
  # nacos地址
    serverAddr = "ip:8848"
    # 命名空间id
    namespace = "7c1cfd88-15e4-437d-8e82-2d22d034f447"
    # 组名
    group = "DEFAULT_GROUP"
    # 集群节点名称
    cluster = "default"
  }
}
config {
  # 通过nacos获取配置
  type = "nacos"
  nacos {
    serverAddr = "ip:8848"
    namespace = "7c1cfd88-15e4-437d-8e82-2d22d034f447"
    group = "DEFAULT_GROUP"
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

经过这几个步骤后seata就可以像我们日常一样的方式进行使用了。

# 基于AT模式详解Seata全链路流程

# seata服务端启动

我们先从seata的服务端启动开始,seata服务端启动时会进行如下几个核心步骤:

  1. 创建工作线程池workingThreads。
  2. 基于工作线程池创建一个Netty服务端对外提供服务。
  3. 基于该服务端创建的一个默认的协调者DefaultCoordinator管理全局事务。
  4. 默认协调者初始化几个定时任务处理一些异步的全局事务提交、回滚、超时监测的任务。

对应的我们给出这块逻辑的核心入口代码,即位于Server的主函数入口的main方法,可以看到seata服务端的创建是基于netty完成的,完成创建和初始化之后就与协调者coordinator进行绑定:

public static void main(String[] args) throws IOException {
        //......
        //创建工作线程池处理业务请求
        ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
                NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
                new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
        //基于该线程池初始化 seata 服务端
        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
 //......
        //log store mode : file, db, redis
        SessionHolder.init(parameterParser.getStoreMode());
        //初始化协调者,处理seata服务端收到的各种事务读写请求
        DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
        //初始化各种异步定时任务:全局事务提交、全局事务回滚、超时监测等
        coordinator.init();
        //将协调者作为seata服务端的处理器
        nettyRemotingServer.setHandler(coordinator);
       //......
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

对应的我们也给出默认协调者的初始化源码,即DefaultCoordinator的init方法,可以看到这段代码本质上就是提交一些定时任务处理全局事务提交、回滚、超时监测、undo log删除等:

public void init() {
        //每秒执行,处理需要回滚的分布式事务
        retryRollbacking.scheduleAtFixedRate(() -> {
            boolean lock = SessionHolder.retryRollbackingLock();
            if (lock) {
                try {
                    handleRetryRollbacking();
                } catch (Exception e) {
                    LOGGER.info("Exception retry rollbacking ... ", e);
                } finally {
                    SessionHolder.unRetryRollbackingLock();
                }
            }
        }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

       //......
        //异步定时提交全局事务的定时任务,每秒执行一次
        asyncCommitting.scheduleAtFixedRate(() -> {
            boolean lock = SessionHolder.asyncCommittingLock();
            if (lock) {
                try {
                    //扫描获取各种异步提交的全局事务
                    handleAsyncCommitting();
                } catch (Exception e) {
                    LOGGER.info("Exception async committing ... ", e);
                } finally {
                    SessionHolder.unAsyncCommittingLock();
                }
            }
        }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

       //......
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 本地服务如何基于GlobalTransaction注解开启事务

我们都知道seata也是基于spring boot实现的,所以我们可以大胆的认为应用端使用GlobalTransaction开启分布式事务本质上也是和spring boot自动装配有着一定的联系。

所以我们从seata-spring-boot-starter这个脚手架的源码包的spring.factories文件入手,可以看到一个SeataAutoConfiguration的注入:

于是我们就可以看到一个GlobalTransactionScanner即一个关于GlobalTransaction注解扫描的类:

 @Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
      //......
        //扫描我们的配置文件中配置的applicationId、txServiceGroup对应的事务
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }
1
2
3
4
5
6
7
8

查看GlobalTransactionScanner源码我们可以看到该类型继承了spring的初始化bean并设置属性后的拓展点InitializingBean的afterPropertiesSet方法,该方法内部会初始化当前seata客户端,分别初始化TM客户端(使用GlobalTransaction注解的方法的服务即做为TM)和RM客户端处理其他TM或者RM服务端发送的消息,它们初始化的工作分别是:

  1. TM客户端会注册各种TC消息响应的处理器,处理各种seata server对应的TC响应的消息,例如:全局事务开启结果处理器、全局事务提交处理器、全局事务回滚处理器等。
  2. RM客户端则是注册一些各种seata server对应TC请求消息的处理器,例如:分支事务提交、分支事务回滚、分支事务undo.log删除等。

对应我们给出GlobalTransactionScanner的afterPropertiesSet源码可以看到客户端初始化这段调用的入口,可以看到启动时某个线程完成CAS上锁初始化标识之后,即通过initClient初始化客户端:

@Override
    public void afterPropertiesSet() {
        //......
        //基于扩展点进行客户端初始化
        if (initialized.compareAndSet(false, true)) {
            initClient();
        }
    }
1
2
3
4
5
6
7
8

步入后即可看到对于TM和RM客户端的初始化调用:

private void initClient() {
        //......
        // 初始化TM客户端
        TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
      	//......
        // 初始化RM客户端
        RMClient.init(applicationId, txServiceGroup);
      //......
    }
1
2
3
4
5
6
7
8
9

此时我们先看看TM客户端内部的处理函数即位于TmNettyRemotingClient的registerProcessor即可看到上述所说的TC响应消息处理器的绑定步骤,即:

  1. 注册TC响应消息处理器
  2. 注册全局事务开启响应处理器
  3. 注册全局事务提交响应处理器
  4. 注册心跳消息处理器
private void registerProcessor() {
        // 1.registry TC response processor 注册一些TC响应消息的处理器
        ClientOnResponseProcessor onResponseProcessor =
                new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
        //全局事务开启结果响应处理器
        super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
        //全局事务提交响应处理器
        super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
        // 2. 注册心跳消息
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

同理我们也给出RM客户端内部初始化的调用RmNettyRemotingClient的registerProcessor方法:

  1. 注册分支事务提交消息处理器
  2. 注册rm客户端对应的分支事务提及和回滚处理器
  3. 注册undo Log删除处理器
  4. 注册TC响应消息处理器
  5. 注册心跳处理器
private void registerProcessor() {
        // 1. 注册分支事务提交消息处理器
        RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
        // 2.注册rm客户端对应的分支事务回滚处理器
        RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
        // 3. 注册undo log删除处理器
        RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
        // 4. 注册TC响应消息处理器
        ClientOnResponseProcessor onResponseProcessor =
            new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
        // 5.注册心跳消息处理器
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

同时GlobalTransactionScanner继承了AbstractAutoProxyCreator的wrapIfNecessary,该代理类会在spring容器中的bean进行检查并决定是否进行动态代理。以我们的GlobalTransactionScanner逻辑它本质上就是:

  1. 检查当前bean是否有GlobalTransactional这个注解
  2. 如果有则基于全局事务拦截器对其进行增强

对应核心逻辑如下所示,可以看到这段代码会通过existsAnnotation检查当前bean是否存在GlobalTransactional注解,如果有则基于globalTransactionalInterceptor 对其进行增强:

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        try {
            synchronized (PROXYED_SET) {
              //......
                //check TCC proxy
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                   //......
                } else {
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                    //判断是否有GlobalTransaction注解,如果有则为其生成分布式事务的动态代理
                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
                    //如果拦截器为空则初始化拦截器
                    if (globalTransactionalInterceptor == null) {
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener(
                            ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener)globalTransactionalInterceptor);
                    }
                    interceptor = globalTransactionalInterceptor;
                }

              //......
                if (!AopUtils.isAopProxy(bean)) {
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    //基于上一步的interceptor为其生成动态代理
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    for (Advisor avr : advisor) {
                        advised.addAdvisor(0, avr);
                    }
                }
             //......
                return bean;
            }
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

这也就意味着我们调用带有GlobalTransactional注解方法时,就会走到GlobalTransactionalInterceptor的增强逻辑上,它会走到GlobalTransactionalInterceptor的invoke方法上,最终会走到事务模板类transactionalTemplate的execute方法,该方法会执行如下三个核心步骤:

  1. 开启全局事务。
  2. 执行原始业务逻辑。
  3. 根据各个分支事务结果提交或者回滚事务。

对应的我们给出GlobalTransactionalInterceptor的invoke方法,可以看到当该方法认为注解存在的情况下会直接调用handleGlobalTransaction开启并处理全局事务:

@Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
      //......
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {

            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            //获取GlobalTransactional注解信息
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            //......
            if (!localDisable) {
                //若全局事务注解不为空则调用handleGlobalTransaction处理全局事务
                if (globalTransactionalAnnotation != null) {
                    return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
                } else if (globalLockAnnotation != null) {
                     //......
                }
            }
        }
         //......
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

步入其内部就会走到transactionalTemplate的execute方法,即可看到对于:

  1. 分支事务的创建
  2. 告知TC请求开启全局事务
  3. 执行本地事务
  4. 全局提交或者回滚

对应逻辑的源码如下所示,读者可结合说明了解:

public Object execute(TransactionalExecutor business) throws Throwable {
       //......

            // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
            //如果tx为空则以全局事务启动者的身份创建一个全新的事务
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }

            // set current tx config to holder
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
                //向TC发送请求开启全局事务
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    // Do Your Business
                    //执行业务逻辑(被代理的原始方法)
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 3. The needed business exception to rollback.
                    //全局事务回滚
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                // 4. everything is fine, commit.
                //分支事务执行成功,提交全局事务
                commitTransaction(tx);

                return rs;
            } finally {
             //......
            }
        } finally {
         //......
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

# 客户端如何开启分布式事务

上文调用分布式事务的方法时内部会走到的代理的transactionalTemplate的execute方法,其内部有个beginTransaction就是开启分布式事务的关键,由上文可知作为GlobalTransactional注解的方法对对应的服务就是作为TM即transaction manager,所以在调用beginTransaction时,这个方法的代理就会以TM的身份发送一个请求告知TC自己要开启一个全局事务,TC经过自己的协调处理后(后文会介绍流程)返回一份xid告知TM开启成功:

对应的我们查看seata客户端对应TransactionalTemplate的beginTransaction方法即可看到begin方法的调用,该方法回告知seata服务端自己要开启一个全局事务:

 private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            //......
            //开始分布式事务
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
         //......
        } catch (TransactionException txe) {
            //......

        }
    }
1
2
3
4
5
6
7
8
9
10
11

查看begin内部就是通过TM发起请求,得到xid并缓存到当前线程内部,开始后续的执行流程分布式事务处理流程:

@Override
    public void begin(int timeout, String name) throws TransactionException {
       	//......
       	//通过TM告知TC开启全局事务,从而得到xid
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        //将xid缓存到当前线程的缓存中
        RootContext.bind(xid);
       	//......
    }
1
2
3
4
5
6
7
8
9
10

# seata服务端如何注册全局事务

基于上述请求,对应seata server端的TC收到请求后会基于传参中的消息标信息,定位到对应的执行器即TM消息处理器,然后驱动TM处理器将这个请求生成一份全局session信息从而构成本次请求的全局事务信息,再将请求写入数据表中:

我们给出TC处理消息的代码入口AbstractNettyRemotingServer的channelRead方法,从名字不难看出TC服务端也是基于netty实现,其内部通过processMessage处理各种消息:

  @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
  
            //基于netty编写的服务端,channelRead通过processMessage处理客户端各种请求
            processMessage(ctx, (RpcMessage) msg);
        }
1
2
3
4
5
6

步入processMessage即可看到基于处理表定位消息并交由处理器处理消息逻辑pair.getFirst().process(ctx, rpcMessage);:

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
         //......
         //获取网络消息
        Object body = rpcMessage.getBody();
        if (body instanceof MessageTypeAware) {
            MessageTypeAware messageTypeAware = (MessageTypeAware) body;
            //通过处理表定位到对应的处理器
            final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
            if (pair != null) {
                if (pair.getSecond() != null) {
                    try {
                        pair.getSecond().execute(() -> {
                            try {
                                //基于第一个处理器处理当前消息
                                pair.getFirst().process(ctx, rpcMessage);
                            } catch (Throwable th) {
                                //......
                            } finally {
                                //......
                            }
                        });
                    } catch (RejectedExecutionException e) {
                        //......
                        
                    }
                } else {
                //......
               }
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

因为我们的消息是TM发来的,所以上一步的处理器是ServerOnRequestProcessor的:

 @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (ChannelManager.isRegistered(ctx.channel())) {
	        //处理TM客户端发送来的消息
            onRequestMessage(ctx, rpcMessage);
        } else {
           //......
        }
    }
1
2
3
4
5
6
7
8
9

最终走到GlobalBeginRequest这个工具的handle基于协调者将事务信息写入global_table从而得到xid返回给TM客户端:

 @Override
    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
        throws TransactionException {
		//生成全局事务信息并得到xid将数据写入响应返回给TM
        response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
            request.getTransactionName(), request.getTimeout()));
       //.......
    }
1
2
3
4
5
6
7
8

# RM和TC如何协调处理分支事务

完成全局事务的注册管理之后,我们再来聊聊各个分支事务的执行和提交回滚,上文提及,seata原生我们本地的jdbc数据库连接通过代理加以封装,所以在我们seata客户端执行本地事务完成后提交的commit方法是经过了seata的代理这一层,该连接代理在调用commit方法时,其内部就会通过RM向TC注册一个分支事务的请求,TC收到请求后会执行如下工作:

  1. 基于lock_table尝试为事务生成全局锁。
  2. 分支事务信息写入到branch_table表上并返回branch_id给RM:

我们给出ConnectionProxy的commit方法入口,其内部调用了一个doCommit方法,它就是事务提交的核心逻辑:

  @Override
    public void commit() throws SQLException {
        try {
        	//excute会调用doCommit生成undoLog缓存和执行分支事务
            LOCK_RETRY_POLICY.execute(() -> {
                //excuete执行成功后这一步会注册分支事务并提交本地事务和undoLog镜像以保证原子性
                doCommit();
                return null;
            });
        } catch (SQLException e) {
           //......
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

其内部调用ConnectionProxy的doCommit会调用processGlobalTransactionCommit执行分支事务:

    private void doCommit() throws SQLException {
        //如果处于全局事务中则调用processGlobalTransactionCommit
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
         //......
        } else {
           //......
        }
    }
1
2
3
4
5
6
7
8
9
10

最终就可以在processGlobalTransactionCommit看到如下逻辑:

  1. register这个注册分支事务的逻辑,TC基于RM给定的resourceId信息,生成操作数据的全局锁,并插入分支事务信息到brach_table中。
  2. undo日志刷盘到本地undo日志中。
  3. 本地业务的事务提交。
private void processGlobalTransactionCommit() throws SQLException {
        try {
            //向TC发起请求注册分支事务,TC基于RM给定的resourceId生成全局锁并插入分支事务信息到brach_table后就不会抛异常
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            //undo日志刷盘
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            //本地事务提交
            targetConnection.commit();
        } catch (Throwable ex) {
          //......
        }
          //......
    }



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

这里我们着重看一下register函数,其内部本质上就是通过RM客户端告知TC自己准备执行分支事务提交,帮我上一把全局锁并注册分支事务:

private void register() throws TransactionException {
        if (!context.hasUndoLog() || !context.hasLockKey()) {
            return;
        }
        //向tc发起请求并获得register
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
            null, context.getXid(), null, context.buildLockKeys());
        //缓存到当前线程中
        context.setBranchId(branchId);
    }
1
2
3
4
5
6
7
8
9
10

最后这个注册的逻辑就会来到AbstractResourceManager的branchRegister上,可以看到它会携带着全局事务id和主键等数据发送请求给TC:

 @Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
        try {
            BranchRegisterRequest request = new BranchRegisterRequest();
            //传入全局事务id即xid
            request.setXid(xid);
            //基于当前数据主键生成lockkeys
            request.setLockKey(lockKeys);
            request.setResourceId(resourceId);
            request.setBranchType(branchType);
            request.setApplicationData(applicationData);
            //基于RM的netty客户端将其异步发送
            BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
         //......
            return response.getBranchId();
        } catch (TimeoutException toe) {
           //......
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# seata服务端处理分支事务请求

TC处理流程与上述文章同理,收到消息后基于request中的消息表定位到对应的处理器,我们这里最终会走到BranchRegisterRequest的处理器上,通过AbstractTCInboundHandler注册分支事务:

 @Override
    public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
        BranchRegisterResponse response = new BranchRegisterResponse();
        exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
            @Override
            public void execute(BranchRegisterRequest request, BranchRegisterResponse response)
                throws TransactionException {
                try {
                    //tc注册分支事务入口
                    doBranchRegister(request, response, rpcContext);
                } catch (StoreException e) {
                 //......
                }
            }
        }, request, response);
        return response;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

最终这段逻辑就会走到AbstractCore的branchRegister,大体执行的步骤是:

  1. 生成分支事务session
  2. 尝试获得数据全局锁lock_table
  3. 取锁成功将分支事务信息写入branch_table
  4. 返回branch_id给RM

对应源码逻辑如下,大体逻辑就说基于分支事务session生成全局锁存到lock_table后,将分支事务信息存到branch_table中:

@Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                               String applicationData, String lockKeys) throws TransactionException {
      //......
        return SessionHolder.lockAndExecute(globalSession, () -> {
             //......
            //获取分支事务的表信息并将其写入到lock_table中意味获得全局锁,上锁失败会抛异常
            branchSessionLock(globalSession, branchSession);

            try {
                //添加分支事务信息到branch_table中
                globalSession.addBranch(branchSession);
            } catch (RuntimeException ex) {
                 //......
            }
             //......
             //返回分支事务id
            return branchSession.getBranchId();
        });
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

TC返回成功后,RM就会执行undo日志刷盘和本地事务提交,详情参考我们本节代码processGlobalTransactionCommit方法,这里不贴出了。

# RM生成回滚日志

对于java程序而言大部分SQL操作底层都是基于Executor执行器操作的,在上述代理执行commit方法前,seata底层将代理的连接即上文的connectionProxy通过AbstractDMLBaseExecutor执行SQL操作,该执会针对我们的连接代理进行如下逻辑处理:

  1. 判断连接代理connectionProxy是否是自动提交,若非自动提交则调用executeAutoCommitFalse方法,该方法会生成undoLog数据写入缓存,然后将RM当执行分支事务SQL,基于该执行结果生成后置镜像,最后将undo日志写入undo_log表中。
  2. 若开启自动提交则关闭自动提交后,复用executeAutoCommitFalse方法执行系统的undoLog和分支事务SQL的执行操作。

对应源码的整体工作链路图如下所示:

这里我们直接给出AbstractDMLBaseExecutor的doExecute方法作为入口,可以看到若开启自动提交则调用executeAutoCommitTrue,反之调用executeAutoCommitFalse:

@Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        //若自动提交则关闭自动提交,并生成undo信息存入缓冲区
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            //直接生成undo log镜像写入缓存
            return executeAutoCommitFalse(args);
        }
    }
1
2
3
4
5
6
7
8
9
10
11

因为都会复用executeAutoCommitFalse这段逻辑,所以我们直接查看这个方法的逻辑,可以看到该逻辑内部会基于分支事务前后的数据生成前置和后置镜像:

protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        //基于分支事务的SQL定位操作前的SQL生成前置镜像
        TableRecords beforeImage = beforeImage();
        //执行分支事务的SQL 
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        //生成分支事务操作后置镜像
        TableRecords afterImage = afterImage(beforeImage);
        //将undoLog写入缓冲区
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 事务全局提交与回滚

TransactionalTemplate(即TM)驱动各种分支事务准备成功后,就会执行commitTransaction提交全局事务,对应的代码位于TransactionalTemplate的execute方法,该方法会通知TC驱动全局事务提交,而TC收到该请求之后,就会驱动各个分支事务提交事务,每个分支事务收到该请求后就会删除undoLog并提交各自未提交的事务:

public Object execute(TransactionalExecutor business) throws Throwable {
       		 //......

            try {
             
                //向TC发送请求开启全局事务
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    
                    //执行业务逻辑(被代理的原始方法)
                    rs = business.execute();
                } catch (Throwable ex) {            
                    //全局事务回滚
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

              
                //分支事务执行成功,提交全局事务
                commitTransaction(tx);

                return rs;
            } finally {
          //......
            }
        } finally {
          //......
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

步入其内部可以看到DefaultGlobalTransaction调用transactionManager即TM提交全局事务:

@Override
    public void commit() throws TransactionException {
       //......
        try {
            while (retry > 0) {
                try {
                    //执行全局事务提交
                    status = transactionManager.commit(xid);
                    break;
                } catch (Throwable ex) {
                   //......
            }
        } finally {
          //......
        }
        //......
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

这个commit的逻辑也很简单,即告知TC要提交全局事务了:

  @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        //通知TC提交全局事务
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }
1
2
3
4
5
6
7
8

对应的TC收到该请求后,对应的AbstractTCInboundHandler就会调用doGlobalCommit通知各个RM提交全局事务:

@Override
    public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
        GlobalCommitResponse response = new GlobalCommitResponse();
        response.setGlobalStatus(GlobalStatus.Committing);
        exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
            @Override
            public void execute(GlobalCommitRequest request, GlobalCommitResponse response)
                throws TransactionException {
                try {
                //遍历RM提交各个分支事务
                    doGlobalCommit(request, response, rpcContext);
                } catch (StoreException e) {
                  //......
                }
            }
     //......

          //......


        }, request, response);
        return response;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

对应的我们可以来道该源码内部的DefaultCore的doGlobalCommit方法印证这一点,可以看到该方法会遍历各个分支事务调用branchCommit通知其提交或者回滚事务:

@Override
    public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
      //......
        if (globalSession.isSaga()) {
            success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
        } else {
            //遍历全局事务中的分支事务
            Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
                //......
                }
                try {
                    //告知RM提交事务
                    BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);

                    //......
                } catch (Exception ex) {
                    //......
                }
                return CONTINUE;
            });
             //......
        }
        //......
        return success;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

最后请求达到RM上的DefaultRMHandler按照TC要求提交或者回滚事务:

 //RM提交分支事务
    @Override
    public BranchCommitResponse handle(BranchCommitRequest request) {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
        return getRMHandler(request.getBranchType()).handle(request);
    }
    //RM回滚分支事务
    @Override
    public BranchRollbackResponse handle(BranchRollbackRequest request) {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
        return getRMHandler(request.getBranchType()).handle(request);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14

提交事务本质上就是提交后删除undoLog即可,这里我们以分支事务回滚为例,可以看到上述代码BranchRollbackResponse 会调用handle方法执行分支事务回滚,该方法最终会走到AbstractRMHandler的doBranchRollback,该方法会调动RM管理器将分支事务回滚:

 protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
        throws TransactionException {
        //......
        //回滚分支事务
        BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
        //将xid和处理结果状态响应给TC
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
       //......
    }
1
2
3
4
5
6
7
8
9
10
11
12

最终该方法内部就会调用AbstractUndoLogManager的undo解析当前分支事务的前置镜像数据,该方法内部执行逻辑为:

  1. 定位分支事务的undo日志数据
  2. 反序列化为undo对象
  3. 基于该undo对象信息解析出表名、列以及数据等信息。
  4. 通过undoExecutor 执行器将分支事务还原。

对应源码如下:

@Override
    public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
      //......

        for (; ; ) {
            try {
               //......

                // Find UNDO LOG
                //获取当前分支事务的undo镜像
                selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                selectPST.setLong(1, branchId);
                selectPST.setString(2, xid);
                rs = selectPST.executeQuery();

                boolean exists = false;
                while (rs.next()) {
                   //......
                    //获取undo数据
                    byte[] rollbackInfo = getRollbackInfo(rs);

                    //反序列化生成undo对象 branchUndoLog
                    String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                    UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                        : UndoLogParserFactory.getInstance(serializer);
                    BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                    try {
                        // put serializer name to local
                        setCurrentSerializer(parser.getName());
                        List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                        if (sqlUndoLogs.size() > 1) {
                            Collections.reverse(sqlUndoLogs);
                        }
                        //遍历undo对象生成SQL还原分支事务值
                        for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                            //获取表的表名、列的元信息
                            TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                                conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                            sqlUndoLog.setTableMeta(tableMeta);
                            //获取对应的执行执行器 将对应分支事务的表数据回滚
                            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                dataSourceProxy.getDbType(), sqlUndoLog);
                            undoExecutor.executeOn(conn);
                        }
                    } finally {
                        // remove serializer name
                        removeCurrentSerializer();
                    }
                }

                //......
            } catch (SQLIntegrityConstraintViolationException e) {
                //......
            } catch (Throwable e) {
                //......

            } finally {
               //......
            }
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

# 小结

让我们来做个小结,总的来说seata实现数据库的AT模式分布式事务的流程为:

  1. 调用带有globalTransactional注解的方法执行业务逻辑。

  2. 该方法以TM的身份通知TC开启全局事务。

  3. TC收到通知后到global_table创建该方法的全局事务信息,这里以笔者某个下单业务的分布式事务场景为例,对应的数据如下所示:

  4. RM开始工作,各自找TC注册分支事务,基于当前数据生成全局锁存入lock_table,保证当前数据操作时没有其他事务干扰:

全局锁成功后TC将数据存入branch_table表,对应数据如下所示:

  1. RM完成分支事务注册后,持有本地锁的事务执行本地分支事务,成功后将生成分支数据的前后镜像undo表,如下所示: 这里我们以后置镜像为例子查看账户表修改后的字段值为例,可以看到该镜像将每一个字段的类型、值等信息都序列化为JSON生成undo镜像:

  1. TM感知到所有分支事务准备成功,通知TC将这些RM(分支事务)提交,即将undoLog删除,反之基于undoLog将数据回滚。

对应我们给出下面这段图,读者可以结合上面源码梳理一下全流程:

我是 SharkChili ,Java 开发者,Java Guide 开源项目维护者。欢迎关注我的公众号:写代码的SharkChili,也欢迎您了解我的开源项目 mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)。

为方便与读者交流,现已创建读者群。关注上方公众号获取我的联系方式,添加时备注加群即可加入。

# 参考

Seata AT 模式:https://seata.apache.org/zh-cn/docs/dev/mode/at-mode/ (opens new window)

深入解析Spring Bean的生命周期管理 :https://sharkchili.blog.csdn.net/article/details/120857166 (opens new window)

编辑 (opens new window)
上次更新: 2026/03/26, 01:05:31
Nacos源码环境搭建与调试指南
一文快速掌握docker的理念和基本使用

← Nacos源码环境搭建与调试指南 一文快速掌握docker的理念和基本使用→

最近更新
01
基于EasyExcel实现高效导出
03-25
02
从开源框架中学习那些实用的位运算技巧
03-25
03
浅谈分布式架构设计思想和常见优化手段
03-25
更多文章>
Theme by Vdoing | Copyright © 2025-2026 Evan Xu | MIT License | 桂ICP备2024034950号 | 桂公网安备45142202000030
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
×
×