消息队列RocketMQ入门指南
# 写在文章开头
在当今的分布式系统中,消息队列(Message Queue)作为解耦、异步通信和流量削峰的重要组件,扮演着不可或缺的角色。而RocketMQ,作为阿里巴巴开源的一款高性能、高可靠的分布式消息中间件,凭借其强大的功能和稳定的性能,成为了众多开发者和企业的首选。
无论你是刚刚接触消息队列的新手,还是希望深入了解RocketMQ的开发者,这篇文章都将为你提供一个清晰的入门指南。我们将从RocketMQ的基本概念出发,逐步讲解其核心功能,并通过简单的实践示例,帮助你快速上手。

Hi,我是 sharkChili ,是个不断在硬核技术上作死的技术人,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。
同时也非常欢迎你star我的开源项目mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
# 详解RocketMQ基础概念
# 为什么要用RocketMQ
相比于市场上的各种消息队列,它有如下优势:
- 性能好。
- 稳定可靠。
- 中文社区活跃。
当然缺点也是有那么一些些的,兼容性确实不太行。
# RocketMQ优缺点是什么
优点:
- 单机吞吐量为
10w级。 - 可用性很高,支持分布式架构。
- 扩展性好。
- 支持10亿级别的消息堆积,而且不会因为堆积导致性能下降。
- 源码是用
Java写的,对于Java程序员来说非常方便改造。 - 参数优化配置,消息基本可以做到0丢失。
- 使用于对可靠性要求高的金融行业。
缺点:
- 目前只支持
Java、C++客户端,而且C++还不算完善。 - 没有在
MQ核心实现JMS相关接口,有些迁移改造就比较麻烦了。
# 消息队列使用场景
- 解耦: 例如用户完成下单除了必要的库存扣减和订单状态更新外,我们还需要处理一些积分系统、推送系统的无关紧要的业务处理,如果全部顺序执行,等待时间就会变得很漫长,所以我们需要借助
MQ将边角业务从业务模块中解耦开来。

- 异步: 这点不必多说,上述的解耦方案就会使得积分系统、促销系统、推送系统任务异步执行。
- 削峰: 可以理解为一个漏斗,例如我们的某个服务只能抗住
10wQPS,可是当前请求却达到20w的QPS,那么我们就可以将请求全部先扔到MQ中,让服务慢慢消化处理。

# RocketMQ基础安装与实践
# 安装并启动RocketMQ
在编写业务代码之前,我们必须完成一下RocketMQ的部署,首先我们自然要下载一下RocketMQ,下载地址如下,笔者下载的是rocketmq-all-4.8.0-bin-release这个版本:
https://rocketmq.apache.org/download/ (opens new window)
完成完成后,我们将其解压到自定义的路径,键入sudo vim /etc/profile配置MQ环境变量,完成后键入source /etc/profile使之生效,对应的配置内容如下所示:
export ROCKETMQ_HOME=/home/sharkchili/rocketmq-all-4.8.0-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin
2
3
需要注意的是笔者本次采用WSL的Ubuntu子系统时启动时脚本会抛出runserver.sh: 70: [[: Exec format error错误,尝试格式化和指令配置后都没有很好的解决,于是循着报错找到runserver.sh这行对应的脚本内容,该括弧本质上就是基于JDK内容配置对应的GC算法:

以笔者为里系统是jdk8,所以直接去掉判断用走JDK8的配置即可:
choose_gc_options()
{
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
}
2
3
4
5
6
7
8
完成后键入./mqnamesrv &将MQ启动,如果弹窗输出下面这条结果,则说明mq的NameServer启动成功。
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
2
3
然后我们再键入./mqbroker -n 127.0.0.1:9876启动broker,需要注意的是默认情况下broker占用堆内存差不多是4g,所以读者本地部署时建议修改一下runbroker.sh的堆内存,如下图所示:

若弹窗输出下面所示的文字,则说明broker启动成功,自此mq就在windows环境部署成功了。我们就可以开始编码工作了。
The broker[DESKTOP-BI4ATFQ, 192.168.237.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
# 订单系统改造
本次的示例是关于订单系统改造,用户下单完成后,服务器需要进行库存扣减、订单状态更新、以及优惠券、积分等边边角角的业务,如果顺序执行这些逻辑+网络开销,接口耗时对于用户体验是非常不友好的。
所以我们在将非核心业务逻辑从接口串行调用中抽出,下单业务只需关注完成我们库存扣减、订单状态更新就行了,剩下的业务用MQ发个消息给积分系统、促销系统告知他们自己处理一下就行了:

首先我们引入MQ依赖脚手架:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
2
3
4
5
同时这里我们也给出配置信息:
# mq地址端口
rocketmq.name-server=127.0.0.1:9876
# 生产者配置
rocketmq.producer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一
rocketmq.producer.group=rocketmq-group
rocketmq.producer.groupName=rocketmq-group
# namesrv地址
rocketmq.producer.namesrvAddr=127.0.0.1:9876
# 设置消息最大长度 4M
rocketmq.producer.maxMessageSize=4096
# 消息发送超时时间
rocketmq.producer.sendMsgTimeout=3000
# 消息发送失败重试次数
rocketmq.producer.retryTimesWhenSendFailed=2
2
3
4
5
6
7
8
9
10
11
12
13
14
15
随后我们设置监听处理关于订单创建的topic消息:
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD")
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("收到订单,订单信息:[{}],进行积分系统、促销系统、推送系统业务处理.....", JSONUtil.toJsonStr(order));
}
}
2
3
4
5
6
7
8
9
完成后我们基于CommandLineRunner 测试一下消息发送:
@Component
@Slf4j
public class MQTest implements CommandLineRunner {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void run(String... args) throws Exception {
Order order = new Order();
order.setOrderNo("20221217001002003");
order.setUserId(1);
order.setPrice(500.00);
rocketMQTemplate.asyncSend("ORDER_ADD", MessageBuilder.withPayload(order).build(), getDefaultSendCallBack());
}
/**
* 消息处理默认回调
* @return
*/
private SendCallback getDefaultSendCallBack() {
return new SendCallback() {
@Override
public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
log.info("MQ消息发送成功。result:{}", JSONUtil.toJsonStr(sendResult));
}
@Override
public void onException(Throwable throwable) {
log.error("MQ消息发送失败,失败原因:{}" + throwable.getMessage());
}
};
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
日志如下,可以看到消息消费成功了:
2025-02-11 10:03:14.577 INFO 14420 --- [MessageThread_1] com.sharkChili.config.OrderMsgListener : 收到订单,订单信息:[{"userId":1,"orderNo":"20221217001002003","price":500}],进行积分系统、促销系统、推送系统业务处理.....
2025-02-11 10:03:14.577 INFO 14420 --- [ublicExecutor_2] com.sharkChili.runner.MQTest : MQ消息发送成功。result:{"sendStatus":"SEND_OK","msgId":"AC1E1001385418B4AAC235A7E0190000","messageQueue":{"topic":"ORDER_ADD","brokerName":"DESKTOP-DC9PSUS","queueId":2},"queueOffset":1,"offsetMsgId":"AC15733800002A9F0000000000000558","regionId":"DefaultRegion","traceOn":true}
2
# 如何实现消息过滤
设置tag消息的方式常见的是有两种,一种是基于tag标签过滤,如下代码所示,我们希望发送订单业务即ORDER_ADD这个主题下tag标签为tagA的用户收到消息,那么我们就可以通过ORDER_ADD:tagA针对topic进行更进一步划分:
//创建订单消息
Order order = new Order();
order.setUserId(1);
order.setOrderNo(UUID.randomUUID().toString());
order.setPrice(500);
//生成消息
Message<Order> message = MessageBuilder.withPayload(order)
.build();
//同步发送
rocketMQTemplate.syncSend("ORDER_ADD:tagA", message);
2
3
4
5
6
7
8
9
10
对应的监听者通过selectorExpression 指定标签即可:
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}",
topic = "ORDER_ADD",
selectorExpression = "tagA"//订阅tagA的消息
)
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("收到订单,订单信息:[{}],进行积分系统、促销系统、推送系统业务处理.....", JSONUtil.toJsonStr(order));
}
}
2
3
4
5
6
7
8
9
10
11
12
还有一种就是基于SQL过滤,因为表达式灵活,相对更强大一些,例如我们的消费者只处理userId为10以内的数据,那么消费者的监听就可以按照如下姿势进行配置:
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}",
topic = "ORDER_ADD",
selectorType = SelectorType.SQL92,//指令类型为sql表达式
selectorExpression = "userId<10"//过滤出id小于10的用户的订单
)
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("收到订单,订单信息:[{}],进行积分系统、促销系统、推送系统业务处理.....", JSONUtil.toJsonStr(order));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
发送消息时,通过headers 指定本消息条件并通过convertAndSend发送即可:
//创建订单消息
Order order = new Order();
order.setUserId(1);
order.setOrderNo(UUID.randomUUID().toString());
order.setPrice(500);
//通过header携带条件告知当前userId为1
Map<String, Object> headers = new HashMap<>();
headers.put("userId", 1);
//生成消息
Message<Order> message = MessageBuilder.withPayload(order)
.build();
//发送
rocketMQTemplate.convertAndSend("ORDER_ADD", message, headers);
2
3
4
5
6
7
8
9
10
11
12
13
需要注意的是默认情况下,MQ是不支持SQL表达过滤,我们需要到conf目录下的broker.conf文件,添加enablePropertyFilter=true,然后键入如下指令降broker启动:
./mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true -c ../conf/broker.conf
# 如何提交延时消息
延迟消息即需要消费者过一段时间后才能消费的消息,例如我们现在有个消息要求消费者10s后才能消费,那么我们就可以使用延迟消息,如下代码所示:
// 创建延迟消息
Message<String> rocketMessage = MessageBuilder.withPayload("this is delay msg").build();
// 发送延迟消息,timeout设置为10000即10s,delayLevel表示延迟等级,1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,所以 3为10s
rocketMQTemplate.syncSend("delay_topic", rocketMessage, 10000,3);
log.info("延迟消息发送完成");
2
3
4
5
消费者代码:
@Component
@RocketMQMessageListener(consumerGroup = "delay_msg_group", topic = "delay_topic")
@Slf4j
public class DelayMsgListener implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
log.info("收到延迟消息,消息内容:{}", JSONUtil.toJsonStr(msg));
}
}
2
3
4
5
6
7
8
9
输出结果,可以看到确实是10s后消费者采纳看到消息并消费:
2025-02-11 10:56:58.300 INFO 18568 --- [ main] com.sharkChili.runner.MQTest : 延迟消息发送完成
2025-02-11 10:57:08.307 INFO 18568 --- [MessageThread_1] com.sharkChili.config.DelayMsgListener : 收到延迟消息,消息内容:this is delay msg
2
3
4
5
# 小结
以上便是笔者对于RocketMQ基础实践的演示,希望对你有帮助。
我是 sharkchili ,CSDN Java 领域博客专家,mini-redis的作者,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。
同时也非常欢迎你star我的开源项目mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
# 参考
SpringBoot 轻松整合 RocketMq,得劲:https://zhuanlan.zhihu.com/p/347394874 (opens new window)
RocketMQ 安装 For Windows10 (完整版):https://blog.csdn.net/lichao_3013/article/details/100574517 (opens new window)
面渣逆袭(RocketMQ面试题八股文)必看:https://tobebetterjavaer.com/sidebar/sanfene/rocketmq.html (opens new window)
SpringBoot消息使用RocketMQ Tag:https://www.jianshu.com/p/0634b0bfa94e (opens new window)
RocketMQ 消息负载均衡策略解析——图解、源码级解析:https://juejin.cn/post/7084825497393168414 (opens new window)
RocketMQTemplate 解析:简化与 RocketMQ 消息系统的交互:https://blog.csdn.net/Li_WenZhang/article/details/142451171 (opens new window)
13 SpringBoot整合RocketMQ实现过滤消息-根据SQL表达式过滤消息 :https://blog.csdn.net/caoli201314/article/details/120248852 (opens new window)