深度解析:基于 RocketMQ 实现分布式事务的技术实践与原理探究
# 写在文章开头
在上一篇文章Spring Boot自动装配原理以及实践 (opens new window)我们完成了服务通用日志监控组件的开发,确保每个服务都可以基于一个注解实现业务功能的监控。
而本文我们尝试基于RocketMQ实现下单的分布式的事务。可能会有读者会有疑问,之前我们不是基于Seata完成了分布式事务,为什么我们还要用到RocketMQ呢?
我们的再来回顾一下我们下单功能大抵是做以下三件事情:
- 创建订单,将订单记录存到数据库中。
- 扣款,记录用户扣款后钱包所剩下的额度。
- 扣除商品库存,并发放商品。
我们将该场景放到高并发场景下,这个功能势必要考虑性能和可靠性问题,所以我们在业务需求清楚明了的情况下,就希望能有一种方式确保下单功能在高并发场景保证性能、可靠性。
而Seata的AT模式确实可以保证最终一致性,但是seata的AT模式本质上是依赖于global_table、branch_table等数据表维护应用层分布式事务,在操作期间会涉及大量的更新和删除操作,随着时间的推移还是会出现大量的索引碎片,导致索引性能下降。
所以我们就考虑采用RocketMQ实现分布式事务,尽管RocketMQ对于分布式事务的实现业务侵入性相对强一些,但它可以保证业务层面的功能解耦从而提升并发性能,且RocketMQ还对消息消费可靠性做了许多不错的优化,例如:失败重试、死信队列等,所以我们还是尝试使用RocketMQ来改良我们的下单分布式事务问题。

Hi,我是 sharkChili ,是个不断在硬核技术上作死的技术人,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。
同时也非常欢迎你star我的开源项目mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
# 详解RocketMQ落地分布式事务案例
# 需求说明
用户下单大抵需要在三个服务中完成:
- 订单服务完成订单创建,基于用户传入的产品编码、用户编码、产品购买数生成订单信息,对应的调用参数如下:
{
"accountCode": "0932897",
"productCode": "P003",
"count": 1
}
2
3
4
5
- 基于入参的用户代码定位到用户钱包金额,完成账户扣款。
- 基于产品和购买数完成库存扣减。
这其中会跨域三个服务,分别是订单服务创建订单、账户服务扣款、商品服务扣减库存。

# 落地思路
以我们业务为最终目标,RocketMQ实现分布式事务的原理是基于2PC的,流程大抵如下:
- 订单服务发送一个事务消息到消息队列,消息内容就是我们的订单信息,这里面包含用户账号、购买的产品代码、购买产品数量等数据。
MQ收到half消息,并回复ack确认。- 生产者
(订单服务order-service)得知我们发送的消息已被收到,订单服务则执行本地事务并提交事务,即将订单信息写入数据库中,同时在该事务内将订单插入结果写入transaction_log表中。 - 生产者
(订单服务order-service)完成本地事务的提交,告知MQ将事务消息commit,此时消费者就可以消费这条消息了,注意若生产者消费失败,则将消息rollback,一切就当没有发生过。 - 如果上述的消息是
commit则将消息持久化到commitLog中,以便后续MQ宕机或者服务宕机后依然可以继续消费这条没有被消费的消息。 (非必要步骤)若MQ长时间没有收到生产者的commit或者rollback的信号,则携带事务id找生产者查询transaction_log索要当前消息状态,如果看到对应的消息则判定生产者事务成功将消息commit给消费者消费,若没看到则说明生产者本地事务执行失败,回滚该消息。- 消费者即我们的用户服务或者库存服务收到消息则执行本地事务并提交,若失败则会不断重试,直到达到上限则将消息存到死信队列并告警。
- 人工介入查看死信队列查看失败消息手工补偿数据。

# 实践-基于RocketMQ实现分布式事务
# 部署RocketMQ(Linux环境)
在编写业务代码之前,我们必须完成一下RocketMQ的部署,首先我们自然要下载一下RocketMQ,下载地址如下,笔者下载的是rocketmq-all-4.8.0-bin-release这个版本:
https://rocketmq.apache.org/download/ (opens new window)
完成完成后,我们将其解压到自定义的路径,键入sudo vim /etc/profile配置MQ环境变量,完成后键入source /etc/profile使之生效,对应的配置内容如下所示:
export ROCKETMQ_HOME=/home/sharkchili/rocketmq-all-4.8.0-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin
2
3
需要注意的是笔者本次采用WSL的Ubuntu子系统时启动时脚本会抛出runserver.sh: 70: [[: Exec format error错误,尝试格式化和指令配置后都没有很好的解决,于是循着报错找到runserver.sh这行对应的脚本内容,该括弧本质上就是基于JDK内容配置对应的GC算法:

以笔者为里系统是jdk8,所以直接去掉判断用走JDK8的配置即可:
choose_gc_options()
{
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
}
2
3
4
5
6
7
8
完成后键入./mqnamesrv &将MQ启动,如果弹窗输出下面这条结果,则说明mq的NameServer启动成功。
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
2
3
然后我们再键入./mqbroker -n 127.0.0.1:9876启动broker,需要注意的是默认情况下broker占用堆内存差不多是4g,所以读者本地部署时建议修改一下runbroker.sh的堆内存,如下图所示:

若弹窗输出下面所示的文字,则说明broker启动成功,自此mq就在windows环境部署成功了。我们就可以开始编码工作了。
The broker[DESKTOP-BI4ATFQ, 192.168.237.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
# 服务引入MQ完成下单功能开发
# 服务引入RocketMQ依赖
完成RocketMQ部署之后,我们就可以着手编码工作了,首先我们要在在三个服务中引入RocketMQ的依赖,由于笔者的spring-boot版本比较老,所以这里笔者为了统一管理在父pom中指定了mq较新的版本号:
<!--rocketmq-->
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
2
3
4
5
6
7
然后我们分别对order、account、product三个服务中引入依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
2
3
4
5
# 注册中心配置RocketMQ信息
由于我们的分布式事务涉及3个服务,而且mq的消费模式采用的是发布订阅模式,所以我们的生产者(order-service)和消费者(account-serivce)都配置为cloud-group
rocketmq.name-server=172.29.193.12:9876
# 指定消费者组
rocketmq.producer.group=cloud-group
2
3
之所以没有没将消费者2(product-service)也配置到cloud-group中的原因也很简单,同一个消息只能被同一个消费者组中的一个成员消费,假如我们的将product-service配置到同一个消费者组中就会出现因一条消息只能被一个服务消费而导致product-service收不到消息。

对此我们实现思路有两种:
- 将服务都放到同一个消费者组,消费模式改为广播模式。
- 将
product-service设置到别的消费者组中。
考虑后续扩展笔者选择方案2,将产品服务的订阅者放到消费者组2中:
rocketmq.name-server=172.29.193.12:9876
rocketmq.producer.group=cloud-group2
2
# 创建消息日志表
我们在上文进行需求梳理时有提到一个MQServer没收到生产者本地事务执行状态进行回查的操作,所以我们在生产者在执行本地事务时,需要创建一张表记录生产者本地事务执行状态,建表SQL如下:
DROP TABLE IF EXISTS `rocketmq_transaction_log`;
CREATE TABLE `rocketmq_transaction_log` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`transaction_id` varchar(50) DEFAULT NULL,
`log` varchar(500) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2
3
4
5
6
7
# 完成order服务half消息发送、监听、回查回调逻辑
我们的订单服务需要做以下三件事:
- 发送half消息给MQ。
- half消息发送成功执行本地事务并记录日志。
- 告知MQ可以提交事务消息。
所以我们需要定义一下消息格式,对象类中必须包含订单号、产品编码、用户编码、购买产品数量等信息。
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class OrderDto {
private static final long serialVersionUID = 1L;
//设置主键自增,避免插入时没必要的报错
@TableId(value = "ID", type = IdType.AUTO)
private Integer id;
/**
* 订单号
*/
private String orderNo;
/**
* 用户编码
*/
private String accountCode;
/**
* 产品编码
*/
private String productCode;
/**
* 产品扣减数量
*/
private Integer count;
/**
* 余额
*/
private BigDecimal amount;
/**
* 本次扣减金额
*/
private BigDecimal price;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
然后我们就可以编写控制层的代码了,通过获取前端传输的参数调用orderService完成half消息发送。
@PostMapping("/order/createOrderByMQ")
public ResultData<String> createOrderByMQ(@RequestBody OrderDto orderDTO) {
log.info("基于mq完成用户下单流程,请求参数: " + JSON.toJSONString(orderDTO));
orderService.createOrderByMQ(orderDTO);
return ResultData.success("基于mq完成用户下单完成");
}
2
3
4
5
6
7
orderService的实现逻辑很简单,定义好消息设置消息头内容和消息载体的对象,通过sendMessageInTransaction方法完成半消息发送,需要了解一下消息的主题(topic)为ORDER_MSG_TOPIC,只有订阅这个主题的消费者才能消费这条消息:
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void createOrderByMQ(OrderDto orderDto) {
//创建half消息对应的事务日志的id
String transactionId = UUID.randomUUID().toString();
//调用产品服务获取商品详情
ResultData<ProductDTO> productInfo = productFeign.getByCode(orderDto.getProductCode());
//计算总售价
BigDecimal amount = productInfo.getData().getPrice().multiply(new BigDecimal(orderDto.getCount()));
orderDto.setAmount(amount);
//将订单信息作为载体
Message<OrderDto> message = MessageBuilder.withPayload(orderDto)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
//下单用户编码
.setHeader("accountCode", orderDto.getAccountCode())
//产品编码
.setHeader("productCode", orderDto.getProductCode())
//产品购买数
.setHeader("count", orderDto.getCount())
//下单金额
.setHeader("amount", amount)
.build();
//发送half消息
rocketMQTemplate.sendMessageInTransaction("ORDER_MSG_TOPIC", message, orderDto);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
完成half消息发送之后,我们就必须知晓消息发送结果才能确定是否执行本地事务并提交,所以我们的订单服务必须创建一个监听器了解half消息的发送情况,executeLocalTransaction方法就是mq成功收到半消息后的回调函数,一旦我们得知消息成功发送之后,MQ就会执行这个方法,笔者通过这个方法获取消息头的参数创建订单对象,调用createOrderWithRocketMqLog完成订单的创建的本地事务成功的日志记录。
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class OrderListener implements RocketMQLocalTransactionListener {
private final IOrderService orderService;
private final RocketmqTransactionLogMapper rocketMqTransactionLogMapper;
/**
* 监听到发送half消息,执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
log.info("order执行本地事务");
try {
//解析消息头
MessageHeaders headers = message.getHeaders();
//获取购买金额
BigDecimal amount = new BigDecimal(String.valueOf(headers.get("amount")));
//获取订单信息
Order order = Order.builder()
.accountCode((String) headers.get("accountCode"))
.amount(amount)
.productCode((String) headers.get("productCode"))
.count(Integer.valueOf(String.valueOf(headers.get("count"))))
.build();
//获取事务id
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
//执行本地事务和记录事务日志
orderService.createOrderWithRocketMqLog(order, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("创建订单失败,失败原因: {}", e.getMessage(), e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 本地事务的检查,检查本地事务是否成功
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers = message.getHeaders();
//获取事务ID
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.info("检查本地事务,事务ID:{}", transactionId);
//根据事务id从日志表检索
QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("transaction_id", transactionId);
RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
//如果消息表存在,则说明生产者事务执行完成,回复commit
if (null != rocketmqTransactionLog) {
return RocketMQLocalTransactionState.COMMIT;
}
//回复rollback
return RocketMQLocalTransactionState.ROLLBACK;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
createOrderWithRocketMqLog做了两件事,分别是插入订单信息和创建消息日志,这里笔者用到了事务注解确保了两个操作的原子性。
这样一来,MQserver后续的回查逻辑完全可以基于RocketmqTransactionLog 进行判断,如果消息的事务id在表中存在,则说明生产者本地事务成功,反之就是失败。
@Transactional(rollbackFor = Exception.class)
@Override
public void createOrderWithRocketMqLog(Order order, String transactionId) {
//创建订单编号
order.setOrderNo(UUID.randomUUID().toString());
//插入订单信息
orderMapper.insert(order);
//事务日志
RocketmqTransactionLog log = RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("执行创建订单操作")
.build();
rocketmqTransactionLogMapper.insert(log);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
补充一下基于MP生成的RocketmqTransactionLog 类代码
@TableName("rocketmq_transaction_log")
@ApiModel(value = "RocketmqTransactionLog对象", description = "")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RocketmqTransactionLog implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(value = "ID", type = IdType.AUTO)
private Integer id;
private String transactionId;
private String log;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 完成account、product监听事件
然后我们就可以实现用户服务和商品服务的监听事件了,一旦生产者提交事务消息之后,这几个消费者都会收到这个topic(主题)的消息,进而完成当前服务的业务逻辑。
先来看看实现扣款的用户服务,我们的监听器继承了RocketMQListener,基于@RocketMQMessageListener注解设置它订阅的主题为createByRocketMQ,一旦收到这个主题的消息时这个监听器就会执行onMessage方法,我们的逻辑很简单,就是获取消息的内容完成扣款,唯一需要注意的就是线程安全问题。我们的压测的情况下,单用户可能会频繁创建订单,在并发期间同一个用户的扣款消息可能同时到达扣款服务中,这就导致单位时间内扣款服务从数据库中查询到相同的余额,执行相同的扣款逻辑,导致金额少扣了。

所以我们必须保证扣款操作互斥和原子化,考虑到笔者当前项目环境是单体,所以就用简单的synchronized 关键字解决问题。
@Slf4j
@Service
@RocketMQMessageListener(topic = "ORDER_MSG_TOPIC", consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class SubtracAmountListener implements RocketMQListener<OrderDto> {
@Resource
private AccountMapper accountMapper;
//强制转为runTimeException
@SneakyThrows
@Override
public void onMessage(OrderDto orderDto) {
log.info("账户服务收到消息,开始消费");
QueryWrapper<Account> query = new QueryWrapper<>();
query.eq("account_code", orderDto.getAccountCode());
//解决单体服务下线程安全问题
synchronized (this){
Account account = accountMapper.selectOne(query);
BigDecimal subtract = account.getAmount().subtract(orderDto.getAmount());
if (subtract.compareTo(BigDecimal.ZERO)<0){
throw new Exception("用户余额不足");
}
account.setAmount(subtract);
log.info("更新账户服务,请求参数:{}", JSON.toJSONString(account));
accountMapper.updateById(account);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
然后就说商品服务,逻辑也很简单,也同样要注意一下线程安全问题:
@Slf4j
@Service
@RocketMQMessageListener(topic = "ORDER_MSG_TOPIC", consumerGroup = "cloud-group2")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ProductSubtractListener implements RocketMQListener<OrderDto> {
@Resource
private ProductMapper productMapper;
@Override
public void onMessage(OrderDto orderDto) {
log.info(" 产品服务收到消息,开始消费");
QueryWrapper<Product> queryWrapper=new QueryWrapper<>();
queryWrapper.eq("product_code",orderDto.getProductCode());
synchronized (this){
Product product = productMapper.selectOne(queryWrapper);
if (product.getCount()<orderDto.getCount()){
throw new RuntimeException("库存不足");
}
product.setCount(product.getCount()-orderDto.getCount());
log.info("更新产品库存信息,请求参数:{}", JSON.toJSONString(product));
productMapper.updateById(product);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 基于几个测试用例验证MQ半消息事务
# 前置准备与说明
完整编码工作后,自测是非常有必要的,我们日常完成开发任务后,都会结合需求场景以及功能编排一些自测用例查看最终结果是否与预期一致。
需要注意的是由于订单业务逻辑较为复杂,很多业务场景一篇博客是不可能全部覆盖,所以这里我们就测试一下基于RocketMQ实现分布式事务常见的几个问题场景是否和预期一致。
在测试前我们必须做好前置准备工作,准备功能测试时涉及到的SQL语句,以本次用户购买产品的业务为例,涉及到订单表、用户账户信息表、产品表、以及生产者本地事务日志表。
SELECT * FROM t_order to2 ;
SELECT * from account a ;
SELECT * from product p ;
SELECT * FROM rocketmq_transaction_log rtl ;
2
3
4
在每次测试完成之后,我们希望数据能够还原,所以这里也需要准备一下每次测试结束后的更新语句,由于订单表和消息日志表都是主键自增,考虑到这两张表只涉及插入,所以笔者为了重置主键的值采取的是truncate语句。
truncate table t_order;
truncate rocketmq_transaction_log ;
UPDATE account set amount=10000 ;
UPDATE product set count=10000;
2
3
4
# 测试正常消费
第一个用例是查看所有服务都正常的情况下,订单表是否有数据,用户表的用户是否会正常扣款,以及商品表库存是否会扣减。
测试前,我们先查看订单表,确认没有数据

查看我们的测试用户,钱包额度为10000

再查看库存表,可以看到数量为1000

确认完数据之后,我们就可以测试服务是否按照预期的方式执行,将所有服务启动

我们通过网关发起调用,请求地址如下:
http://localhost:8090/order/order/createOrderByMQ
请求参数如下,从参数可以看出这个请求意为用户代码(accountCode)为demoData这个用户希望购买1个(count)产品代码(productCode)为P001的产品,该产品当前售价(price)为1元。
{
"accountCode": "0932897",
"productCode": "P003",
"count": 1
}
2
3
4
5
调用完成后,查看订单表,订单数据生成无误:

查看用户服务是否完成用户扣款,扣款无误:

查看产品表,可以看到产品数量也准确扣减:

# 测试生产者commit提交失败
我们希望测试一下发送完half消息之后,执行本地事务完成,但是未提交commit请求时,MQServer是否会调用回查逻辑。
为了完成这一点我们必须按照以下两个步骤执行:
- 在订单服务提交事务消息处打个断点。

- 发起请求,当代码执行到这里的时候通过
jps定位到进程号,将其强制杀死。如下所示,我们的代码执行到了提交事务消息这一步:

我们通过jps定位并将其杀死

- 完成这些步骤后,我们再次将服务启动,等待片刻之后可以发现,
MQServer会调用checkLocalTransaction回查生产者本地事务的情况。我们放行这块代码让程序执行下去,最后再查看数据库中的数据结果是否符合预期。

# 测试消费者消费失败
测试消费者执行报错后是否会进行重试,这一点就比较好测试了,我们在消费者监听器中插入随便插入一个报错查看其是否会不断重试。这里笔者就不多做演示,实验结果是会进行不断重试,当重试次数达到阈值时会将结果存到死信队列中。

# 压测MQ和Seata的性能
由于MQ是采用异步消费的形式解耦了服务间的业务,而我们的Seata采用默认的AT模式每次执行分布式事务时都会需要借助undo-log、全局锁等的方式保证最终一致性。所以理论上RocketMQ的性能肯定是高于Seata的,对此我们不妨使用Jmeter进行压测来验证一下。
本次压测只用了1000个并发,MQ和seata的压测结果如下,可以看到MQ无论从执行时间还是成功率都远远优秀于Seata的。
MQ的压测结果:

Seata的压测结果,可以看到大量的数据因为lock_table锁超时而导致失败,所以整体性能表现非常差劲:

# 详解RocketMQ落地分布式事务常见问题
# RocketMQ 如何保证事务的最终一致性
最终一致性是一种允许软状态存在的分布式事务解决方案,RocketMQ 保证事务最终一致性的方式主要是依赖生产者本地事务和消息可靠发送的原子性来最大努力保证最终一致性,注意这里笔者所强调的尽最大努力交付。
之所以说是最大努力交付是说RocketMQ是通过保证生产者事务和消息发送可靠性的原子性和一致性,由此保证消费者一定能够消费到消息,理想情况下,只要消费者能够正确消费消息,事务结果最终是可以保证一致性的,但是复杂的系统因素消费者可能会存在消费失败的情况,此时事务最终一致性就无法保证,业界的做法是通过手动操作或者脚本等方式完成数据补偿。

# 什么是half消息
half消息即半消息,和普通消息的区别是该消息不会立马被消费者消费,原因是half消息的存在是为了保证生产者本地事务和消费者的原子性和一致性,其过程如上文所介绍,初始发送的half消息是存储在MQ一个内存队列中(并未投递到topic中),只有生产者本地事务成功并发送commit通知后,这个消息才会被持久化到commitLog同时提交到topic队列中,此时消费者才能够消费该消息并执行本地事务。
# 为什么要先发送half消息再执行本地事务?先执行本地事务,成功后在发送不行吗?
先发送half消息的原因是为了尽可能确保生产者和消息队列通信正常,只有通信正常了才能确保生产者本地事务和消息发送的原子性和一致性,由此保证分布式事务的可靠性。
先执行本地事务,执行成功后再发送存在一个问题,试想一下,假设我们本地事务执行成功,但是发送的消息因为网络波动等诸多原因导致MQ没有收到消息,此时生产者和消费者的分布式事务就会出现数据不一致问题。

而half消息则不同,它会优先发送一个消费者感知不到的half消息确认通信可达,然后执行本地事务后降消息设置未commit让消费者消费,即使说commit消息未收到,因为half消息的存在,MQ在指定超时先限制后也可以通过回查的方式到生产者事务表查询执行情况。
# 如果mq收到half消息,准备发送success信号的消息给生产者,但因为网络波动导致生产者没有收到这个消息要怎么办?
此时生产者就会认为half消息发送失败,本地事务不执行,随着时间推移MQ长时间没收到commit或者rollback消息就会回查生产者消息日志表,明确没看到数据则知晓生产者本地事务执行失败,直接rollback掉half消息,而消费者全程无感知,业务上的一致性也是可以保证。

# MQ没有收到生产者(订单服务)的commit或者rollback信号怎么保证事务最终一致性?
常规的做法就是建立一张表记录消息状态,只要我们订单信息插入成功就需要日志一下这条数据,所以我们必须保证订单数据插入和日志插入表中的原子性,确保生产者的事务和消息日志的ACID:

# 如果生产者执行本地事务失败了怎么办?
这一点前面的部分也已经说明,首先将本地会事务回滚,并向消息队列提交一个rollback的请求不提交half消息,消息就不会被消费者消费,保证最终一致性。
# 前面说的都是事务流程?这和事务消息如何保证数据最终一致性有什么关系?
生产者和消息队列事务流程可以确保生产者和消息队列发送的一致性,确保写操作都是同时成功或者失败。只有保证两者正常通信,才能确保消费者可以消费MQ中的消息从而完成数据最终一致性。
# 消费者提交本地事务失败了怎么办?
我们都知道消息队列只能保证消息可靠性,而无法保证分布式事务的强一致性,出现这种情况,消费者 不向 MQ 提交本次消息的 offset 即可。如果不提交 offset,那么 MQ 会在一定时间后,继续将这条消息推送给消费者,消费者就可以继续执行本地事务并提交了,直到成功消息队列会进行N次重试,如果还是失败,则可以到死信队列中查看失败消息,然后通过补偿机制实现分布式事务最终一致性。
# 小结
我是 sharkchili ,CSDN Java 领域博客专家,mini-redis的作者,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。
同时也非常欢迎你star我的开源项目mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
# 参考
SpringCloud Alibaba微服务实战三十二 - 集成RocketMQ实现分布式事务:https://mp.weixin.qq.com/s?__biz=MzAwMTk4NjM1MA==&mid=2247491582&idx=1&sn=9f5276015aa78fe0198363f368b8ea3c&chksm=9ad005bfada78ca9146fb50283451b4a52944619d40af1ec08d99bc2b5f94e5eca345bbba4d8&token=1863605670&lang=zh_CN#rd (opens new window)
Lombok注解-@SneakyThrows:https://www.jianshu.com/p/7d0ed3aef34b (opens new window)
RocketMq 广播模式:https://www.jianshu.com/p/93f984f5a68f (opens new window)
使用RocketMQTemplate发送各种消息:https://juejin.cn/post/6850418115382738958 (opens new window)
RocketMQ事务消息如何保证数据的最终一致性:https://juejin.cn/post/6936441094880264229 (opens new window)
RocketMQ 在Linux上的安装:https://blog.csdn.net/xhmico/article/details/122938904 (opens new window)