禅与计算机 禅与计算机
首页
  • Java基础

    • 聊一聊java一些核心知识点
    • 聊聊java面向对象核心知识点
    • 聊聊Java中的异常
    • 聊聊Java中的常用类String
    • 万字长文带你细聊Java注解本质
    • 来聊聊Java的反射机制
    • 深入解析Java泛型的魅力与机制
    • Java集合框架深度解析与面试指南
    • Java常用集合类HashMap深度解析
    • LinkedHashMap源码到面试题的全解析
    • 深入解析CopyOnWriteArrayList的工作机制
    • Java基础IO总结
    • Java三大IO模型小结
    • Java BIO NIO AIO详解
    • Java进阶NIO之IO多路复用详解
    • Java8流式编程入门
    • 一文速通lambda与函数式编程
    • Java8函数式方法引用最佳实践
  • Java并发编程

    • Java并发编程基础小结
    • 深入理解Java中的final关键字
    • 浅谈Java并发安全发布技术
    • 浅谈Java并发编程中断的哲学
    • Java线程池知识点小结
    • 浅谈Java线程池中拒绝策略与流控的艺术
    • synchronized关键字使用指南
    • 深入源码解析synchronized关键字
    • 详解JUC包下的锁
    • 详解并发编程中的CAS原子类
    • LongAdder源码分析
    • AQS源码解析
    • 深入剖析Java并发编程中的死锁问题
    • Java并发容器总结
    • 详解Java并发编程volatile关键字
    • 并发编程ThreadLocal必知必会
    • CompletableFuture基础实践小结
    • CompletableFuture异步多任务最佳实践
    • 硬核详解FutureTask设计与实现
    • 线程池大小设置的底层逻辑与场景化方案
    • 来聊一个有趣的限流器RateLimiter
  • JVM相关

    • 从零开始掌握 JVM
    • JVM核心知识点小结
    • JVM指令集概览:基础与应用
    • JVM类加载器深度解析
    • JVM方法区深度解析
    • Java内存模型JMM详解
    • Java对象大小的精确计算方法
    • 逃逸分析在Java中的应用与优化
    • 从零开始理解JVM的JIT编译机制
    • G1垃圾回收器:原理详解与调优指南
    • JVM故障排查实战指南
    • JVM内存问题排错最佳实践
    • JVM内存溢出排查指南
    • 简明的Arthas使用教程
    • 简明的Arthas配置及基础运维教程
    • 基于Arthas Idea的JVM故障排查与指令生成
    • 基于arthas量化监控诊断java应用方法论与实践
    • 深入剖析arthas技术原理
  • 深入理解Spring框架

    • Spring 核心知识点全面解析
    • Spring核心功能IOC详解
    • Spring AOP 深度剖析与实践
    • Spring 三级缓存机制深度解析
    • 深入 Spring 源码,剖析设计模式的落地实践
    • 探索 Spring 事务的奥秘
    • 深入解析Spring Bean的生命周期管理
    • 解读 Spring Boot 核心知识点
    • Spring Boot 启动优化实战:1分钟到13秒的排查与优化之路
    • Spring Boot自动装配原理及实践
    • 一文快速上手Sharding-JDBC
    • sharding-jdbc如何实现分页查询
    • 基于DynamicDataSource整合分库分表框架Shardingsphere
  • 计算机组成原理

    • 计算机硬件知识小结
    • CPU核心知识点小结
    • 浅谈CPU流水线的艺术
    • 从Java程序员视角聊聊CPU缓存
    • CPU任务调度和伪共享问题小结
    • CPU MESI缓存一致性协议
    • CPU内存管理机制
    • 内存深度解析
    • 磁盘存储原理
    • 详解计算机启动步骤
    • CPU南北桥架构与发展史
    • CPU中断机制与硬件交互详解
  • 操作系统

    • 如何实现一个高性能服务器
    • Linux文件结构与文件权限
    • Linux常见压缩指令小结
    • Linux核心系统调用详解
    • Linux进程管理
    • Linux线程管理
    • 进程与线程深度解析
    • Linux进程间通信机制
    • 零拷贝技术原理与实践
    • CPU缓存一致性问题深度解析
    • IO任务与CPU调度艺术
  • 计算机网络

    • 网卡通信原理详解
    • 网卡数据包处理指南
    • 基于抓包详解TCP协议
  • 编码最佳实践

    • 浅谈现代软件工程TDD最佳实践
    • 浅谈TDD模式下并发程序设计与实现
    • 面向AI编程新范式Trae后端开发环境搭建与实践
    • 基于提示词工程的Redis签到功能开发实践
    • 基于Vibe Coding的Redis分页查询实现
    • 告别AI无效对话:资深工程师的提示词设计最佳实践
  • 实用技巧与配置

    • Mac常用快捷键与效率插件指南
    • Keynote技术科普短视频制作全攻略
  • 写作

    • 写好技术博客的5大核心原则:从认知科学到AI工具的全流程指南
  • 开发工具

    • IDEA配置详解与高效使用指南
  • Nodejs
  • 博客搭建
  • Redis

    • Redis核心知识小结
    • 解锁Redis发布订阅模式
    • 掌握Redis事务
    • Redis主从复制技术
    • Redis的哨兵模式详解
    • 深度剖析Redisson分布式锁
    • 详解redis单线程设计思路
    • 来聊聊Redis所实现的Reactor模型
    • Redis RDB持久化源码深度解析
    • 来聊聊redis的AOF写入
    • 来聊聊Redis持久化AOF管道通信的设计
    • 来聊聊redis集群数据迁移
    • Redis SDS动态字符串深度解析
    • 高效索引的秘密:redis跳表设计与实现
    • 聊聊redis中的字典设计与实现
  • MySQL

    • MySQL基础知识点小结
    • 解读MySQL 索引基础
    • MySQL 索引进阶指南
    • 解读MySQL Explain关键字
    • 探秘 MySQL 锁:原理与实践
    • 详解MySQL重做日志redolog
    • 详解undoLog在MySQL MVCC中的运用
    • MySQL二进制日志binlog核心知识点
    • MySQL高效插入数据的最佳实践
    • MySQL分页查询优化指南
    • MySQL流式查询的奥秘与应用解析
    • 来聊聊分库分表
    • 来聊聊大厂常用的分布式ID生成方案
  • ElasticSearch

    • 从Lucene到Elasticsearch:进化之路
    • ES 基础使用指南
    • ElasticSearch如何写入一篇文档
    • 深入剖析Elasticsearch文档读取原理
    • 聊聊ElasticSearch性能调优
    • Spring借助Easy-Es操作ES
  • Netty

    • 一文快速了解高性能网络通信框架Netty
    • Netty网络传输简记
    • 来聊聊Netty的ByteBuf
    • 来聊聊Netty消息发送的那些事
    • 解密Netty高性能之谜:NioEventLoop线程池阻塞分析
    • 详解Netty中的责任链Pipeline如何管理ChannelHandler
    • Netty Reactor模型常见知识点小结
    • Netty如何驾驭TCP流式传输?粘包拆包问题全解
    • Netty解码器源码解析
  • 消息队列

    • 一文快速入门消息队列
    • 消息队列RocketMQ入门指南
    • 基于RocketMQ实现分布式事务
    • RocketMQ容器化最佳实践
    • RocketMQ常见问题与深度解析
    • Kafka快速安装与使用指南
  • Nginx

    • Linux下的nginx安装
    • Nginx基础入门总结
    • Nginx核心指令小结
    • Nginx进程结构与核心模块初探
    • Nginx应用进阶HTTP核心模块配置
    • Nginx缓存及HTTPS配置小记
    • nginx高可用实践简记
    • Nginx性能优化
  • 微服务基础

    • 微服务基础知识小结
    • 分布式事务核心概念小结
    • OpenFeign核心知识小结
    • 微服务组件Gateway核心使用小结
    • 分布式事务Seata实践
    • 用 Docker Compose 完成 Seata 的整合部署
  • Nacos

    • Nacos服务注册原理全解析
    • Nacos服务订阅流程全解析
    • Nacos服务变更推送流程全解析
    • 深入解析SpringCloud负载均衡器Loadbalancer
    • Nacos源码环境搭建与调试指南
  • Seata

    • 深度剖析Seata源码
  • Docker部署

    • 一文快速掌握docker的理念和基本使用
    • 使用docker编排容器
    • 基于docker-compose部署微服务基本环境
    • 基于docker容器化部署微服务
    • Gateway全局异常处理及请求响应监控
    • Docker图形化界面工具Portainer最佳实践
  • Go基础

    • 一文带你速通Go语言基础语法
    • 一文快速掌握Go语言切片
    • 来聊聊go语言的hashMap
    • 一文速通go语言类型系统
    • 浅谈Go语言中的面向对象
    • go语言是如何实现协程的
    • 聊聊go语言中的GMP模型
    • 极简的go语言channel入门
    • 聊聊go语言基于epoll的网络并发实现
    • 写给Java开发的Go语言协程实践
  • mini-redis实战

    • 来聊聊我用go手写redis这件事
    • mini-redis如何解析处理客户端请求
    • 实现mini-redis字符串操作
    • 硬核复刻redis底层双向链表核心实现
    • 动手复刻redis之go语言下的字典的设计与落地
    • Go 语言下的 Redis 跳表设计与实现
    • Go 语言版 Redis 有序集合指令复刻探索
  • 项目编排

    • Spring脚手架创建简记
    • Spring脚手架集成分页插件
    • Spring脚手架集成校验框架
    • maven父子模块两种搭建方式简记
    • SpringBoot+Vue3前后端快速整合入门
    • 来聊聊Java项目分层规范
  • 场景设计

    • Java实现文件分片上传
    • 基于时间缓存优化浏览器轮询阻塞问题
    • 基于EasyExcel实现高效导出
    • 10亿数据高效插入MySQL最佳方案
    • 从开源框架中学习那些实用的位运算技巧
  • CI/CD

    • 基于NETAPP实现内网穿透
    • 基于Gitee实现Jenkins自动化部署SpringBoot项目
    • Jenkins离线安装部署教程简记
    • 基于Nexus搭建Maven私服基础入门
    • 基于内网的Jenkins整合gitlab综合方案简记
  • 监控方法论

    • SpringBoot集成Prometheus与Grafana监控
    • Java监控度量Micrometer全解析
    • 从 micrometer计量器角度快速上手promQL
    • 硬核安利一个监控告警开源项目Nightingale
  • Spring AI

    • Spring AI Alibaba深度实战:一文掌握智能体开发全流程
    • Spring AI Alibaba实战:JVM监控诊断Arthas Agent的工程化构建与最佳实践
  • 大模型评测

    • M2.7 真能打!我用两个真实场景测了测,结果有点意外
    • Qoder JetBrains插件评测:祖传代码重构与接口优化实战
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

sharkchili

计算机禅修者
首页
  • Java基础

    • 聊一聊java一些核心知识点
    • 聊聊java面向对象核心知识点
    • 聊聊Java中的异常
    • 聊聊Java中的常用类String
    • 万字长文带你细聊Java注解本质
    • 来聊聊Java的反射机制
    • 深入解析Java泛型的魅力与机制
    • Java集合框架深度解析与面试指南
    • Java常用集合类HashMap深度解析
    • LinkedHashMap源码到面试题的全解析
    • 深入解析CopyOnWriteArrayList的工作机制
    • Java基础IO总结
    • Java三大IO模型小结
    • Java BIO NIO AIO详解
    • Java进阶NIO之IO多路复用详解
    • Java8流式编程入门
    • 一文速通lambda与函数式编程
    • Java8函数式方法引用最佳实践
  • Java并发编程

    • Java并发编程基础小结
    • 深入理解Java中的final关键字
    • 浅谈Java并发安全发布技术
    • 浅谈Java并发编程中断的哲学
    • Java线程池知识点小结
    • 浅谈Java线程池中拒绝策略与流控的艺术
    • synchronized关键字使用指南
    • 深入源码解析synchronized关键字
    • 详解JUC包下的锁
    • 详解并发编程中的CAS原子类
    • LongAdder源码分析
    • AQS源码解析
    • 深入剖析Java并发编程中的死锁问题
    • Java并发容器总结
    • 详解Java并发编程volatile关键字
    • 并发编程ThreadLocal必知必会
    • CompletableFuture基础实践小结
    • CompletableFuture异步多任务最佳实践
    • 硬核详解FutureTask设计与实现
    • 线程池大小设置的底层逻辑与场景化方案
    • 来聊一个有趣的限流器RateLimiter
  • JVM相关

    • 从零开始掌握 JVM
    • JVM核心知识点小结
    • JVM指令集概览:基础与应用
    • JVM类加载器深度解析
    • JVM方法区深度解析
    • Java内存模型JMM详解
    • Java对象大小的精确计算方法
    • 逃逸分析在Java中的应用与优化
    • 从零开始理解JVM的JIT编译机制
    • G1垃圾回收器:原理详解与调优指南
    • JVM故障排查实战指南
    • JVM内存问题排错最佳实践
    • JVM内存溢出排查指南
    • 简明的Arthas使用教程
    • 简明的Arthas配置及基础运维教程
    • 基于Arthas Idea的JVM故障排查与指令生成
    • 基于arthas量化监控诊断java应用方法论与实践
    • 深入剖析arthas技术原理
  • 深入理解Spring框架

    • Spring 核心知识点全面解析
    • Spring核心功能IOC详解
    • Spring AOP 深度剖析与实践
    • Spring 三级缓存机制深度解析
    • 深入 Spring 源码,剖析设计模式的落地实践
    • 探索 Spring 事务的奥秘
    • 深入解析Spring Bean的生命周期管理
    • 解读 Spring Boot 核心知识点
    • Spring Boot 启动优化实战:1分钟到13秒的排查与优化之路
    • Spring Boot自动装配原理及实践
    • 一文快速上手Sharding-JDBC
    • sharding-jdbc如何实现分页查询
    • 基于DynamicDataSource整合分库分表框架Shardingsphere
  • 计算机组成原理

    • 计算机硬件知识小结
    • CPU核心知识点小结
    • 浅谈CPU流水线的艺术
    • 从Java程序员视角聊聊CPU缓存
    • CPU任务调度和伪共享问题小结
    • CPU MESI缓存一致性协议
    • CPU内存管理机制
    • 内存深度解析
    • 磁盘存储原理
    • 详解计算机启动步骤
    • CPU南北桥架构与发展史
    • CPU中断机制与硬件交互详解
  • 操作系统

    • 如何实现一个高性能服务器
    • Linux文件结构与文件权限
    • Linux常见压缩指令小结
    • Linux核心系统调用详解
    • Linux进程管理
    • Linux线程管理
    • 进程与线程深度解析
    • Linux进程间通信机制
    • 零拷贝技术原理与实践
    • CPU缓存一致性问题深度解析
    • IO任务与CPU调度艺术
  • 计算机网络

    • 网卡通信原理详解
    • 网卡数据包处理指南
    • 基于抓包详解TCP协议
  • 编码最佳实践

    • 浅谈现代软件工程TDD最佳实践
    • 浅谈TDD模式下并发程序设计与实现
    • 面向AI编程新范式Trae后端开发环境搭建与实践
    • 基于提示词工程的Redis签到功能开发实践
    • 基于Vibe Coding的Redis分页查询实现
    • 告别AI无效对话:资深工程师的提示词设计最佳实践
  • 实用技巧与配置

    • Mac常用快捷键与效率插件指南
    • Keynote技术科普短视频制作全攻略
  • 写作

    • 写好技术博客的5大核心原则:从认知科学到AI工具的全流程指南
  • 开发工具

    • IDEA配置详解与高效使用指南
  • Nodejs
  • 博客搭建
  • Redis

    • Redis核心知识小结
    • 解锁Redis发布订阅模式
    • 掌握Redis事务
    • Redis主从复制技术
    • Redis的哨兵模式详解
    • 深度剖析Redisson分布式锁
    • 详解redis单线程设计思路
    • 来聊聊Redis所实现的Reactor模型
    • Redis RDB持久化源码深度解析
    • 来聊聊redis的AOF写入
    • 来聊聊Redis持久化AOF管道通信的设计
    • 来聊聊redis集群数据迁移
    • Redis SDS动态字符串深度解析
    • 高效索引的秘密:redis跳表设计与实现
    • 聊聊redis中的字典设计与实现
  • MySQL

    • MySQL基础知识点小结
    • 解读MySQL 索引基础
    • MySQL 索引进阶指南
    • 解读MySQL Explain关键字
    • 探秘 MySQL 锁:原理与实践
    • 详解MySQL重做日志redolog
    • 详解undoLog在MySQL MVCC中的运用
    • MySQL二进制日志binlog核心知识点
    • MySQL高效插入数据的最佳实践
    • MySQL分页查询优化指南
    • MySQL流式查询的奥秘与应用解析
    • 来聊聊分库分表
    • 来聊聊大厂常用的分布式ID生成方案
  • ElasticSearch

    • 从Lucene到Elasticsearch:进化之路
    • ES 基础使用指南
    • ElasticSearch如何写入一篇文档
    • 深入剖析Elasticsearch文档读取原理
    • 聊聊ElasticSearch性能调优
    • Spring借助Easy-Es操作ES
  • Netty

    • 一文快速了解高性能网络通信框架Netty
    • Netty网络传输简记
    • 来聊聊Netty的ByteBuf
    • 来聊聊Netty消息发送的那些事
    • 解密Netty高性能之谜:NioEventLoop线程池阻塞分析
    • 详解Netty中的责任链Pipeline如何管理ChannelHandler
    • Netty Reactor模型常见知识点小结
    • Netty如何驾驭TCP流式传输?粘包拆包问题全解
    • Netty解码器源码解析
  • 消息队列

    • 一文快速入门消息队列
    • 消息队列RocketMQ入门指南
    • 基于RocketMQ实现分布式事务
    • RocketMQ容器化最佳实践
    • RocketMQ常见问题与深度解析
    • Kafka快速安装与使用指南
  • Nginx

    • Linux下的nginx安装
    • Nginx基础入门总结
    • Nginx核心指令小结
    • Nginx进程结构与核心模块初探
    • Nginx应用进阶HTTP核心模块配置
    • Nginx缓存及HTTPS配置小记
    • nginx高可用实践简记
    • Nginx性能优化
  • 微服务基础

    • 微服务基础知识小结
    • 分布式事务核心概念小结
    • OpenFeign核心知识小结
    • 微服务组件Gateway核心使用小结
    • 分布式事务Seata实践
    • 用 Docker Compose 完成 Seata 的整合部署
  • Nacos

    • Nacos服务注册原理全解析
    • Nacos服务订阅流程全解析
    • Nacos服务变更推送流程全解析
    • 深入解析SpringCloud负载均衡器Loadbalancer
    • Nacos源码环境搭建与调试指南
  • Seata

    • 深度剖析Seata源码
  • Docker部署

    • 一文快速掌握docker的理念和基本使用
    • 使用docker编排容器
    • 基于docker-compose部署微服务基本环境
    • 基于docker容器化部署微服务
    • Gateway全局异常处理及请求响应监控
    • Docker图形化界面工具Portainer最佳实践
  • Go基础

    • 一文带你速通Go语言基础语法
    • 一文快速掌握Go语言切片
    • 来聊聊go语言的hashMap
    • 一文速通go语言类型系统
    • 浅谈Go语言中的面向对象
    • go语言是如何实现协程的
    • 聊聊go语言中的GMP模型
    • 极简的go语言channel入门
    • 聊聊go语言基于epoll的网络并发实现
    • 写给Java开发的Go语言协程实践
  • mini-redis实战

    • 来聊聊我用go手写redis这件事
    • mini-redis如何解析处理客户端请求
    • 实现mini-redis字符串操作
    • 硬核复刻redis底层双向链表核心实现
    • 动手复刻redis之go语言下的字典的设计与落地
    • Go 语言下的 Redis 跳表设计与实现
    • Go 语言版 Redis 有序集合指令复刻探索
  • 项目编排

    • Spring脚手架创建简记
    • Spring脚手架集成分页插件
    • Spring脚手架集成校验框架
    • maven父子模块两种搭建方式简记
    • SpringBoot+Vue3前后端快速整合入门
    • 来聊聊Java项目分层规范
  • 场景设计

    • Java实现文件分片上传
    • 基于时间缓存优化浏览器轮询阻塞问题
    • 基于EasyExcel实现高效导出
    • 10亿数据高效插入MySQL最佳方案
    • 从开源框架中学习那些实用的位运算技巧
  • CI/CD

    • 基于NETAPP实现内网穿透
    • 基于Gitee实现Jenkins自动化部署SpringBoot项目
    • Jenkins离线安装部署教程简记
    • 基于Nexus搭建Maven私服基础入门
    • 基于内网的Jenkins整合gitlab综合方案简记
  • 监控方法论

    • SpringBoot集成Prometheus与Grafana监控
    • Java监控度量Micrometer全解析
    • 从 micrometer计量器角度快速上手promQL
    • 硬核安利一个监控告警开源项目Nightingale
  • Spring AI

    • Spring AI Alibaba深度实战:一文掌握智能体开发全流程
    • Spring AI Alibaba实战:JVM监控诊断Arthas Agent的工程化构建与最佳实践
  • 大模型评测

    • M2.7 真能打!我用两个真实场景测了测,结果有点意外
    • Qoder JetBrains插件评测:祖传代码重构与接口优化实战
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Netty

  • 消息队列

    • Kafka

    • RocketMQ

      • 一文快速入门消息队列
      • 消息队列RocketMQ入门指南
      • 深度解析:基于 RocketMQ 实现分布式事务的技术实践与原理探究
        • 写在文章开头
        • 详解RocketMQ落地分布式事务案例
          • 需求说明
          • 落地思路
        • 实践-基于RocketMQ实现分布式事务
          • 部署RocketMQ(Linux环境)
          • 服务引入MQ完成下单功能开发
          • 服务引入RocketMQ依赖
          • 注册中心配置RocketMQ信息
          • 创建消息日志表
          • 完成order服务half消息发送、监听、回查回调逻辑
          • 完成account、product监听事件
        • 基于几个测试用例验证MQ半消息事务
          • 前置准备与说明
          • 测试正常消费
          • 测试生产者commit提交失败
          • 测试消费者消费失败
        • 压测MQ和Seata的性能
        • 详解RocketMQ落地分布式事务常见问题
          • RocketMQ 如何保证事务的最终一致性
          • 什么是half消息
          • 为什么要先发送half消息再执行本地事务?先执行本地事务,成功后在发送不行吗?
          • 如果mq收到half消息,准备发送success信号的消息给生产者,但因为网络波动导致生产者没有收到这个消息要怎么办?
          • MQ没有收到生产者(订单服务)的commit或者rollback信号怎么保证事务最终一致性?
          • 如果生产者执行本地事务失败了怎么办?
          • 前面说的都是事务流程?这和事务消息如何保证数据最终一致性有什么关系?
          • 消费者提交本地事务失败了怎么办?
        • 小结
        • 参考
      • RocketMQ容器化最佳实践
      • RocketMQ常见问题与深度解析
  • Nginx

  • 中间件
  • 消息队列
  • RocketMQ
sharkchili
2023-02-23
目录

深度解析:基于 RocketMQ 实现分布式事务的技术实践与原理探究

# 写在文章开头

在上一篇文章Spring Boot自动装配原理以及实践 (opens new window)我们完成了服务通用日志监控组件的开发,确保每个服务都可以基于一个注解实现业务功能的监控。 而本文我们尝试基于RocketMQ实现下单的分布式的事务。可能会有读者会有疑问,之前我们不是基于Seata完成了分布式事务,为什么我们还要用到RocketMQ呢?

我们的再来回顾一下我们下单功能大抵是做以下三件事情:

  1. 创建订单,将订单记录存到数据库中。
  2. 扣款,记录用户扣款后钱包所剩下的额度。
  3. 扣除商品库存,并发放商品。

我们将该场景放到高并发场景下,这个功能势必要考虑性能和可靠性问题,所以我们在业务需求清楚明了的情况下,就希望能有一种方式确保下单功能在高并发场景保证性能、可靠性。 而Seata的AT模式确实可以保证最终一致性,但是seata的AT模式本质上是依赖于global_table、branch_table等数据表维护应用层分布式事务,在操作期间会涉及大量的更新和删除操作,随着时间的推移还是会出现大量的索引碎片,导致索引性能下降。

所以我们就考虑采用RocketMQ实现分布式事务,尽管RocketMQ对于分布式事务的实现业务侵入性相对强一些,但它可以保证业务层面的功能解耦从而提升并发性能,且RocketMQ还对消息消费可靠性做了许多不错的优化,例如:失败重试、死信队列等,所以我们还是尝试使用RocketMQ来改良我们的下单分布式事务问题。

Hi,我是 sharkChili ,是个不断在硬核技术上作死的技术人,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。

同时也非常欢迎你star我的开源项目mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

# 详解RocketMQ落地分布式事务案例

# 需求说明

用户下单大抵需要在三个服务中完成:

  1. 订单服务完成订单创建,基于用户传入的产品编码、用户编码、产品购买数生成订单信息,对应的调用参数如下:
{
    "accountCode": "0932897",
    "productCode": "P003",
    "count": 1
}
1
2
3
4
5
  1. 基于入参的用户代码定位到用户钱包金额,完成账户扣款。
  2. 基于产品和购买数完成库存扣减。

这其中会跨域三个服务,分别是订单服务创建订单、账户服务扣款、商品服务扣减库存。

# 落地思路

以我们业务为最终目标,RocketMQ实现分布式事务的原理是基于2PC的,流程大抵如下:

  1. 订单服务发送一个事务消息到消息队列,消息内容就是我们的订单信息,这里面包含用户账号、购买的产品代码、购买产品数量等数据。
  2. MQ收到half消息,并回复ack确认。
  3. 生产者(订单服务order-service)得知我们发送的消息已被收到,订单服务则执行本地事务并提交事务,即将订单信息写入数据库中,同时在该事务内将订单插入结果写入transaction_log表中。
  4. 生产者(订单服务order-service)完成本地事务的提交,告知MQ将事务消息commit,此时消费者就可以消费这条消息了,注意若生产者消费失败,则将消息rollback,一切就当没有发生过。
  5. 如果上述的消息是commit则将消息持久化到commitLog中,以便后续MQ宕机或者服务宕机后依然可以继续消费这条没有被消费的消息。
  6. (非必要步骤)若MQ长时间没有收到生产者的commit或者rollback的信号,则携带事务id找生产者查询transaction_log索要当前消息状态,如果看到对应的消息则判定生产者事务成功将消息commit给消费者消费,若没看到则说明生产者本地事务执行失败,回滚该消息。
  7. 消费者即我们的用户服务或者库存服务收到消息则执行本地事务并提交,若失败则会不断重试,直到达到上限则将消息存到死信队列并告警。
  8. 人工介入查看死信队列查看失败消息手工补偿数据。

# 实践-基于RocketMQ实现分布式事务

# 部署RocketMQ(Linux环境)

在编写业务代码之前,我们必须完成一下RocketMQ的部署,首先我们自然要下载一下RocketMQ,下载地址如下,笔者下载的是rocketmq-all-4.8.0-bin-release这个版本:

https://rocketmq.apache.org/download/ (opens new window)

完成完成后,我们将其解压到自定义的路径,键入sudo vim /etc/profile配置MQ环境变量,完成后键入source /etc/profile使之生效,对应的配置内容如下所示:

export ROCKETMQ_HOME=/home/sharkchili/rocketmq-all-4.8.0-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin

1
2
3

需要注意的是笔者本次采用WSL的Ubuntu子系统时启动时脚本会抛出runserver.sh: 70: [[: Exec format error错误,尝试格式化和指令配置后都没有很好的解决,于是循着报错找到runserver.sh这行对应的脚本内容,该括弧本质上就是基于JDK内容配置对应的GC算法:

以笔者为里系统是jdk8,所以直接去掉判断用走JDK8的配置即可:

choose_gc_options()
{

      JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
      JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
      JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
 
}
1
2
3
4
5
6
7
8

完成后键入./mqnamesrv &将MQ启动,如果弹窗输出下面这条结果,则说明mq的NameServer启动成功。

Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
1
2
3

然后我们再键入./mqbroker -n 127.0.0.1:9876启动broker,需要注意的是默认情况下broker占用堆内存差不多是4g,所以读者本地部署时建议修改一下runbroker.sh的堆内存,如下图所示:

若弹窗输出下面所示的文字,则说明broker启动成功,自此mq就在windows环境部署成功了。我们就可以开始编码工作了。

The broker[DESKTOP-BI4ATFQ, 192.168.237.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
1

# 服务引入MQ完成下单功能开发

# 服务引入RocketMQ依赖

完成RocketMQ部署之后,我们就可以着手编码工作了,首先我们要在在三个服务中引入RocketMQ的依赖,由于笔者的spring-boot版本比较老,所以这里笔者为了统一管理在父pom中指定了mq较新的版本号:

   <!--rocketmq-->
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>
1
2
3
4
5
6
7

然后我们分别对order、account、product三个服务中引入依赖:

 <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
           
        </dependency>
1
2
3
4
5

# 注册中心配置RocketMQ信息

由于我们的分布式事务涉及3个服务,而且mq的消费模式采用的是发布订阅模式,所以我们的生产者(order-service)和消费者(account-serivce)都配置为cloud-group

rocketmq.name-server=172.29.193.12:9876
# 指定消费者组
rocketmq.producer.group=cloud-group
1
2
3

之所以没有没将消费者2(product-service)也配置到cloud-group中的原因也很简单,同一个消息只能被同一个消费者组中的一个成员消费,假如我们的将product-service配置到同一个消费者组中就会出现因一条消息只能被一个服务消费而导致product-service收不到消息。

对此我们实现思路有两种:

  1. 将服务都放到同一个消费者组,消费模式改为广播模式。
  2. 将product-service设置到别的消费者组中。

考虑后续扩展笔者选择方案2,将产品服务的订阅者放到消费者组2中:

rocketmq.name-server=172.29.193.12:9876
rocketmq.producer.group=cloud-group2
1
2

# 创建消息日志表

我们在上文进行需求梳理时有提到一个MQServer没收到生产者本地事务执行状态进行回查的操作,所以我们在生产者在执行本地事务时,需要创建一张表记录生产者本地事务执行状态,建表SQL如下:

DROP TABLE IF EXISTS `rocketmq_transaction_log`;
CREATE TABLE `rocketmq_transaction_log` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `transaction_id` varchar(50) DEFAULT NULL,
  `log` varchar(500) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
1
2
3
4
5
6
7

# 完成order服务half消息发送、监听、回查回调逻辑

我们的订单服务需要做以下三件事:

  1. 发送half消息给MQ。
  2. half消息发送成功执行本地事务并记录日志。
  3. 告知MQ可以提交事务消息。

所以我们需要定义一下消息格式,对象类中必须包含订单号、产品编码、用户编码、购买产品数量等信息。

@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class OrderDto {

    private static final long serialVersionUID = 1L;

	//设置主键自增,避免插入时没必要的报错
    @TableId(value = "ID", type = IdType.AUTO)
    private Integer id;

    /**
     * 订单号
     */
    private String orderNo;

    /**
     * 用户编码
     */
    private String accountCode;

    /**
     * 产品编码
     */
    private String productCode;

    /**
     * 产品扣减数量
     */
    private Integer count;

    /**
     * 余额
     */
    private BigDecimal amount;

    /**
     * 本次扣减金额
     */
    private BigDecimal price;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

然后我们就可以编写控制层的代码了,通过获取前端传输的参数调用orderService完成half消息发送。

@PostMapping("/order/createOrderByMQ")
    public ResultData<String> createOrderByMQ(@RequestBody OrderDto orderDTO) {
        log.info("基于mq完成用户下单流程,请求参数: " + JSON.toJSONString(orderDTO));
        orderService.createOrderByMQ(orderDTO);
        return ResultData.success("基于mq完成用户下单完成");

    }
1
2
3
4
5
6
7

orderService的实现逻辑很简单,定义好消息设置消息头内容和消息载体的对象,通过sendMessageInTransaction方法完成半消息发送,需要了解一下消息的主题(topic)为ORDER_MSG_TOPIC,只有订阅这个主题的消费者才能消费这条消息:



 @Autowired
    private RocketMQTemplate rocketMQTemplate;

@Override
    public void createOrderByMQ(OrderDto orderDto) {


        //创建half消息对应的事务日志的id
        String transactionId = UUID.randomUUID().toString();

        //调用产品服务获取商品详情
        ResultData<ProductDTO> productInfo = productFeign.getByCode(orderDto.getProductCode());
        //计算总售价
        BigDecimal amount = productInfo.getData().getPrice().multiply(new BigDecimal(orderDto.getCount()));
        orderDto.setAmount(amount);

        //将订单信息作为载体
        Message<OrderDto> message = MessageBuilder.withPayload(orderDto)
                .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                //下单用户编码
                .setHeader("accountCode", orderDto.getAccountCode())
                //产品编码
                .setHeader("productCode", orderDto.getProductCode())
                //产品购买数
                .setHeader("count", orderDto.getCount())
                //下单金额
                .setHeader("amount", amount)
                .build();

        //发送half消息
        rocketMQTemplate.sendMessageInTransaction("ORDER_MSG_TOPIC", message, orderDto);


    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

完成half消息发送之后,我们就必须知晓消息发送结果才能确定是否执行本地事务并提交,所以我们的订单服务必须创建一个监听器了解half消息的发送情况,executeLocalTransaction方法就是mq成功收到半消息后的回调函数,一旦我们得知消息成功发送之后,MQ就会执行这个方法,笔者通过这个方法获取消息头的参数创建订单对象,调用createOrderWithRocketMqLog完成订单的创建的本地事务成功的日志记录。

@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class OrderListener implements RocketMQLocalTransactionListener {
    private final IOrderService orderService;
    private final RocketmqTransactionLogMapper rocketMqTransactionLogMapper;

    /**
     * 监听到发送half消息,执行本地事务
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("order执行本地事务");


        try {
            //解析消息头
            MessageHeaders headers = message.getHeaders();
            //获取购买金额
            BigDecimal amount = new BigDecimal(String.valueOf(headers.get("amount")));
            //获取订单信息
            Order order = Order.builder()
                    .accountCode((String) headers.get("accountCode"))
                    .amount(amount)
                    .productCode((String) headers.get("productCode"))
                    .count(Integer.valueOf(String.valueOf(headers.get("count"))))
                    .build();
            //获取事务id
            String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
            //执行本地事务和记录事务日志
            orderService.createOrderWithRocketMqLog(order, transactionId);

            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("创建订单失败,失败原因: {}", e.getMessage(), e);
            return RocketMQLocalTransactionState.ROLLBACK;
        }

        
    }

    /**
     * 本地事务的检查,检查本地事务是否成功
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {

        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info("检查本地事务,事务ID:{}", transactionId);
        //根据事务id从日志表检索
        QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("transaction_id", transactionId);
        RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
        //如果消息表存在,则说明生产者事务执行完成,回复commit
        if (null != rocketmqTransactionLog) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        //回复rollback
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

createOrderWithRocketMqLog做了两件事,分别是插入订单信息和创建消息日志,这里笔者用到了事务注解确保了两个操作的原子性。 这样一来,MQserver后续的回查逻辑完全可以基于RocketmqTransactionLog 进行判断,如果消息的事务id在表中存在,则说明生产者本地事务成功,反之就是失败。

 @Transactional(rollbackFor = Exception.class)
    @Override
    public void createOrderWithRocketMqLog(Order order, String transactionId) {
        //创建订单编号
        order.setOrderNo(UUID.randomUUID().toString());
        //插入订单信息
        orderMapper.insert(order);
        //事务日志
        RocketmqTransactionLog log = RocketmqTransactionLog.builder()
                .transactionId(transactionId)
                .log("执行创建订单操作")
                .build();
        rocketmqTransactionLogMapper.insert(log);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14

补充一下基于MP生成的RocketmqTransactionLog 类代码

@TableName("rocketmq_transaction_log")
@ApiModel(value = "RocketmqTransactionLog对象", description = "")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RocketmqTransactionLog implements Serializable {

    private static final long serialVersionUID = 1L;

    @TableId(value = "ID", type = IdType.AUTO)
    private Integer id;

    private String transactionId;

    private String log;


}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 完成account、product监听事件

然后我们就可以实现用户服务和商品服务的监听事件了,一旦生产者提交事务消息之后,这几个消费者都会收到这个topic(主题)的消息,进而完成当前服务的业务逻辑。

先来看看实现扣款的用户服务,我们的监听器继承了RocketMQListener,基于@RocketMQMessageListener注解设置它订阅的主题为createByRocketMQ,一旦收到这个主题的消息时这个监听器就会执行onMessage方法,我们的逻辑很简单,就是获取消息的内容完成扣款,唯一需要注意的就是线程安全问题。我们的压测的情况下,单用户可能会频繁创建订单,在并发期间同一个用户的扣款消息可能同时到达扣款服务中,这就导致单位时间内扣款服务从数据库中查询到相同的余额,执行相同的扣款逻辑,导致金额少扣了。

所以我们必须保证扣款操作互斥和原子化,考虑到笔者当前项目环境是单体,所以就用简单的synchronized 关键字解决问题。

@Slf4j
@Service
@RocketMQMessageListener(topic = "ORDER_MSG_TOPIC", consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class SubtracAmountListener implements RocketMQListener<OrderDto> {

    @Resource
    private AccountMapper accountMapper;

    //强制转为runTimeException
    @SneakyThrows
    @Override
    public void onMessage(OrderDto orderDto) {
        log.info("账户服务收到消息,开始消费");
        QueryWrapper<Account> query = new QueryWrapper<>();
        query.eq("account_code", orderDto.getAccountCode());
        //解决单体服务下线程安全问题
        synchronized (this){
            Account account = accountMapper.selectOne(query);
            BigDecimal subtract = account.getAmount().subtract(orderDto.getAmount());
            if (subtract.compareTo(BigDecimal.ZERO)<0){
                throw new Exception("用户余额不足");
            }
            account.setAmount(subtract);
            log.info("更新账户服务,请求参数:{}", JSON.toJSONString(account));
            accountMapper.updateById(account);
        }


    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

然后就说商品服务,逻辑也很简单,也同样要注意一下线程安全问题:

@Slf4j
@Service
@RocketMQMessageListener(topic = "ORDER_MSG_TOPIC", consumerGroup = "cloud-group2")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ProductSubtractListener implements RocketMQListener<OrderDto> {
    @Resource
    private ProductMapper productMapper;

    @Override
    public void onMessage(OrderDto orderDto) {
        log.info(" 产品服务收到消息,开始消费");
        QueryWrapper<Product> queryWrapper=new QueryWrapper<>();
        queryWrapper.eq("product_code",orderDto.getProductCode());
        synchronized (this){
            Product product = productMapper.selectOne(queryWrapper);
            if (product.getCount()<orderDto.getCount()){
                throw new RuntimeException("库存不足");
            }

            product.setCount(product.getCount()-orderDto.getCount());
            log.info("更新产品库存信息,请求参数:{}", JSON.toJSONString(product));
            productMapper.updateById(product);
        }



    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 基于几个测试用例验证MQ半消息事务

# 前置准备与说明

完整编码工作后,自测是非常有必要的,我们日常完成开发任务后,都会结合需求场景以及功能编排一些自测用例查看最终结果是否与预期一致。 需要注意的是由于订单业务逻辑较为复杂,很多业务场景一篇博客是不可能全部覆盖,所以这里我们就测试一下基于RocketMQ实现分布式事务常见的几个问题场景是否和预期一致。

在测试前我们必须做好前置准备工作,准备功能测试时涉及到的SQL语句,以本次用户购买产品的业务为例,涉及到订单表、用户账户信息表、产品表、以及生产者本地事务日志表。

SELECT * FROM t_order to2 ;
SELECT * from account a ;
SELECT * from product p ;
SELECT * FROM rocketmq_transaction_log rtl ;
1
2
3
4

在每次测试完成之后,我们希望数据能够还原,所以这里也需要准备一下每次测试结束后的更新语句,由于订单表和消息日志表都是主键自增,考虑到这两张表只涉及插入,所以笔者为了重置主键的值采取的是truncate语句。

truncate  table  t_order;
truncate rocketmq_transaction_log ;
UPDATE account set amount=10000 ;
UPDATE product set count=10000;
1
2
3
4

# 测试正常消费

第一个用例是查看所有服务都正常的情况下,订单表是否有数据,用户表的用户是否会正常扣款,以及商品表库存是否会扣减。

测试前,我们先查看订单表,确认没有数据

查看我们的测试用户,钱包额度为10000

再查看库存表,可以看到数量为1000

确认完数据之后,我们就可以测试服务是否按照预期的方式执行,将所有服务启动

我们通过网关发起调用,请求地址如下:

http://localhost:8090/order/order/createOrderByMQ
1

请求参数如下,从参数可以看出这个请求意为用户代码(accountCode)为demoData这个用户希望购买1个(count)产品代码(productCode)为P001的产品,该产品当前售价(price)为1元。

{
    "accountCode": "0932897",
    "productCode": "P003",
    "count": 1
}
1
2
3
4
5

调用完成后,查看订单表,订单数据生成无误:

查看用户服务是否完成用户扣款,扣款无误:

查看产品表,可以看到产品数量也准确扣减:

# 测试生产者commit提交失败

我们希望测试一下发送完half消息之后,执行本地事务完成,但是未提交commit请求时,MQServer是否会调用回查逻辑。

为了完成这一点我们必须按照以下两个步骤执行:

  1. 在订单服务提交事务消息处打个断点。

  1. 发起请求,当代码执行到这里的时候通过jps定位到进程号,将其强制杀死。如下所示,我们的代码执行到了提交事务消息这一步:

我们通过jps定位并将其杀死

  1. 完成这些步骤后,我们再次将服务启动,等待片刻之后可以发现,MQServer会调用checkLocalTransaction回查生产者本地事务的情况。我们放行这块代码让程序执行下去,最后再查看数据库中的数据结果是否符合预期。

# 测试消费者消费失败

测试消费者执行报错后是否会进行重试,这一点就比较好测试了,我们在消费者监听器中插入随便插入一个报错查看其是否会不断重试。这里笔者就不多做演示,实验结果是会进行不断重试,当重试次数达到阈值时会将结果存到死信队列中。

# 压测MQ和Seata的性能

由于MQ是采用异步消费的形式解耦了服务间的业务,而我们的Seata采用默认的AT模式每次执行分布式事务时都会需要借助undo-log、全局锁等的方式保证最终一致性。所以理论上RocketMQ的性能肯定是高于Seata的,对此我们不妨使用Jmeter进行压测来验证一下。

本次压测只用了1000个并发,MQ和seata的压测结果如下,可以看到MQ无论从执行时间还是成功率都远远优秀于Seata的。

MQ的压测结果:

Seata的压测结果,可以看到大量的数据因为lock_table锁超时而导致失败,所以整体性能表现非常差劲:

# 详解RocketMQ落地分布式事务常见问题

# RocketMQ 如何保证事务的最终一致性

最终一致性是一种允许软状态存在的分布式事务解决方案,RocketMQ 保证事务最终一致性的方式主要是依赖生产者本地事务和消息可靠发送的原子性来最大努力保证最终一致性,注意这里笔者所强调的尽最大努力交付。

之所以说是最大努力交付是说RocketMQ是通过保证生产者事务和消息发送可靠性的原子性和一致性,由此保证消费者一定能够消费到消息,理想情况下,只要消费者能够正确消费消息,事务结果最终是可以保证一致性的,但是复杂的系统因素消费者可能会存在消费失败的情况,此时事务最终一致性就无法保证,业界的做法是通过手动操作或者脚本等方式完成数据补偿。

# 什么是half消息

half消息即半消息,和普通消息的区别是该消息不会立马被消费者消费,原因是half消息的存在是为了保证生产者本地事务和消费者的原子性和一致性,其过程如上文所介绍,初始发送的half消息是存储在MQ一个内存队列中(并未投递到topic中),只有生产者本地事务成功并发送commit通知后,这个消息才会被持久化到commitLog同时提交到topic队列中,此时消费者才能够消费该消息并执行本地事务。

# 为什么要先发送half消息再执行本地事务?先执行本地事务,成功后在发送不行吗?

先发送half消息的原因是为了尽可能确保生产者和消息队列通信正常,只有通信正常了才能确保生产者本地事务和消息发送的原子性和一致性,由此保证分布式事务的可靠性。

先执行本地事务,执行成功后再发送存在一个问题,试想一下,假设我们本地事务执行成功,但是发送的消息因为网络波动等诸多原因导致MQ没有收到消息,此时生产者和消费者的分布式事务就会出现数据不一致问题。

而half消息则不同,它会优先发送一个消费者感知不到的half消息确认通信可达,然后执行本地事务后降消息设置未commit让消费者消费,即使说commit消息未收到,因为half消息的存在,MQ在指定超时先限制后也可以通过回查的方式到生产者事务表查询执行情况。

# 如果mq收到half消息,准备发送success信号的消息给生产者,但因为网络波动导致生产者没有收到这个消息要怎么办?

此时生产者就会认为half消息发送失败,本地事务不执行,随着时间推移MQ长时间没收到commit或者rollback消息就会回查生产者消息日志表,明确没看到数据则知晓生产者本地事务执行失败,直接rollback掉half消息,而消费者全程无感知,业务上的一致性也是可以保证。

# MQ没有收到生产者(订单服务)的commit或者rollback信号怎么保证事务最终一致性?

常规的做法就是建立一张表记录消息状态,只要我们订单信息插入成功就需要日志一下这条数据,所以我们必须保证订单数据插入和日志插入表中的原子性,确保生产者的事务和消息日志的ACID:

# 如果生产者执行本地事务失败了怎么办?

这一点前面的部分也已经说明,首先将本地会事务回滚,并向消息队列提交一个rollback的请求不提交half消息,消息就不会被消费者消费,保证最终一致性。

# 前面说的都是事务流程?这和事务消息如何保证数据最终一致性有什么关系?

生产者和消息队列事务流程可以确保生产者和消息队列发送的一致性,确保写操作都是同时成功或者失败。只有保证两者正常通信,才能确保消费者可以消费MQ中的消息从而完成数据最终一致性。

# 消费者提交本地事务失败了怎么办?

我们都知道消息队列只能保证消息可靠性,而无法保证分布式事务的强一致性,出现这种情况,消费者 不向 MQ 提交本次消息的 offset 即可。如果不提交 offset,那么 MQ 会在一定时间后,继续将这条消息推送给消费者,消费者就可以继续执行本地事务并提交了,直到成功消息队列会进行N次重试,如果还是失败,则可以到死信队列中查看失败消息,然后通过补偿机制实现分布式事务最终一致性。

# 小结

我是 sharkchili ,CSDN Java 领域博客专家,mini-redis的作者,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。

同时也非常欢迎你star我的开源项目mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

# 参考

SpringCloud Alibaba微服务实战三十二 - 集成RocketMQ实现分布式事务:https://mp.weixin.qq.com/s?__biz=MzAwMTk4NjM1MA==&mid=2247491582&idx=1&sn=9f5276015aa78fe0198363f368b8ea3c&chksm=9ad005bfada78ca9146fb50283451b4a52944619d40af1ec08d99bc2b5f94e5eca345bbba4d8&token=1863605670&lang=zh_CN#rd (opens new window)

Lombok注解-@SneakyThrows:https://www.jianshu.com/p/7d0ed3aef34b (opens new window)

RocketMq 广播模式:https://www.jianshu.com/p/93f984f5a68f (opens new window)

使用RocketMQTemplate发送各种消息:https://juejin.cn/post/6850418115382738958 (opens new window)

RocketMQ事务消息如何保证数据的最终一致性:https://juejin.cn/post/6936441094880264229 (opens new window)

RocketMQ 在Linux上的安装:https://blog.csdn.net/xhmico/article/details/122938904 (opens new window)

编辑 (opens new window)
上次更新: 2026/03/26, 01:05:31
消息队列RocketMQ入门指南
RocketMQ容器化最佳实践

← 消息队列RocketMQ入门指南 RocketMQ容器化最佳实践→

最近更新
01
基于EasyExcel实现高效导出
03-25
02
从开源框架中学习那些实用的位运算技巧
03-25
03
浅谈分布式架构设计思想和常见优化手段
03-25
更多文章>
Theme by Vdoing | Copyright © 2025-2026 Evan Xu | MIT License | 桂ICP备2024034950号 | 桂公网安备45142202000030
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
×
×