禅与计算机 禅与计算机
首页
  • Java基础

    • 聊一聊java一些核心知识点
    • 聊聊java面向对象核心知识点
    • 聊聊Java中的异常
    • 聊聊Java中的常用类String
    • 万字长文带你细聊Java注解本质
    • 来聊聊Java的反射机制
    • 深入解析Java泛型的魅力与机制
    • Java集合框架深度解析与面试指南
    • Java常用集合类HashMap深度解析
    • LinkedHashMap源码到面试题的全解析
    • 深入解析CopyOnWriteArrayList的工作机制
    • Java基础IO总结
    • Java三大IO模型小结
    • Java BIO NIO AIO详解
    • Java进阶NIO之IO多路复用详解
    • Java8流式编程入门
    • 一文速通lambda与函数式编程
    • Java8函数式方法引用最佳实践
  • Java并发编程

    • Java并发编程基础小结
    • 深入理解Java中的final关键字
    • 浅谈Java并发安全发布技术
    • 浅谈Java并发编程中断的哲学
    • Java线程池知识点小结
    • 浅谈Java线程池中拒绝策略与流控的艺术
    • synchronized关键字使用指南
    • 深入源码解析synchronized关键字
    • 详解JUC包下的锁
    • 详解并发编程中的CAS原子类
    • LongAdder源码分析
    • AQS源码解析
    • 深入剖析Java并发编程中的死锁问题
    • Java并发容器总结
    • 详解Java并发编程volatile关键字
    • 并发编程ThreadLocal必知必会
    • CompletableFuture基础实践小结
    • CompletableFuture异步多任务最佳实践
    • 硬核详解FutureTask设计与实现
    • 线程池大小设置的底层逻辑与场景化方案
    • 来聊一个有趣的限流器RateLimiter
  • JVM相关

    • 从零开始掌握 JVM
    • JVM核心知识点小结
    • JVM指令集概览:基础与应用
    • JVM类加载器深度解析
    • JVM方法区深度解析
    • Java内存模型JMM详解
    • Java对象大小的精确计算方法
    • 逃逸分析在Java中的应用与优化
    • 从零开始理解JVM的JIT编译机制
    • G1垃圾回收器:原理详解与调优指南
    • JVM故障排查实战指南
    • JVM内存问题排错最佳实践
    • JVM内存溢出排查指南
    • 简明的Arthas使用教程
    • 简明的Arthas配置及基础运维教程
    • 基于Arthas Idea的JVM故障排查与指令生成
    • 基于arthas量化监控诊断java应用方法论与实践
    • 深入剖析arthas技术原理
  • 深入理解Spring框架

    • Spring 核心知识点全面解析
    • Spring核心功能IOC详解
    • Spring AOP 深度剖析与实践
    • Spring 三级缓存机制深度解析
    • 深入 Spring 源码,剖析设计模式的落地实践
    • 探索 Spring 事务的奥秘
    • 深入解析Spring Bean的生命周期管理
    • 解读 Spring Boot 核心知识点
    • Spring Boot 启动优化实战:1分钟到13秒的排查与优化之路
    • Spring Boot自动装配原理及实践
    • 一文快速上手Sharding-JDBC
    • sharding-jdbc如何实现分页查询
    • 基于DynamicDataSource整合分库分表框架Shardingsphere
  • 计算机组成原理

    • 计算机硬件知识小结
    • CPU核心知识点小结
    • 浅谈CPU流水线的艺术
    • 从Java程序员视角聊聊CPU缓存
    • CPU任务调度和伪共享问题小结
    • CPU MESI缓存一致性协议
    • CPU内存管理机制
    • 内存深度解析
    • 磁盘存储原理
    • 详解计算机启动步骤
    • CPU南北桥架构与发展史
    • CPU中断机制与硬件交互详解
  • 操作系统

    • 如何实现一个高性能服务器
    • Linux文件结构与文件权限
    • Linux常见压缩指令小结
    • Linux核心系统调用详解
    • Linux进程管理
    • Linux线程管理
    • 进程与线程深度解析
    • Linux进程间通信机制
    • 零拷贝技术原理与实践
    • CPU缓存一致性问题深度解析
    • IO任务与CPU调度艺术
  • 计算机网络

    • 网卡通信原理详解
    • 网卡数据包处理指南
    • 基于抓包详解TCP协议
  • 编码最佳实践

    • 浅谈现代软件工程TDD最佳实践
    • 浅谈TDD模式下并发程序设计与实现
    • 面向AI编程新范式Trae后端开发环境搭建与实践
    • 基于提示词工程的Redis签到功能开发实践
    • 基于Vibe Coding的Redis分页查询实现
    • 告别AI无效对话:资深工程师的提示词设计最佳实践
  • 实用技巧与配置

    • Mac常用快捷键与效率插件指南
    • Keynote技术科普短视频制作全攻略
  • 写作

    • 写好技术博客的5大核心原则:从认知科学到AI工具的全流程指南
  • 开发工具

    • IDEA配置详解与高效使用指南
  • Nodejs
  • 博客搭建
  • Redis

    • Redis核心知识小结
    • 解锁Redis发布订阅模式
    • 掌握Redis事务
    • Redis主从复制技术
    • Redis的哨兵模式详解
    • 深度剖析Redisson分布式锁
    • 详解redis单线程设计思路
    • 来聊聊Redis所实现的Reactor模型
    • Redis RDB持久化源码深度解析
    • 来聊聊redis的AOF写入
    • 来聊聊Redis持久化AOF管道通信的设计
    • 来聊聊redis集群数据迁移
    • Redis SDS动态字符串深度解析
    • 高效索引的秘密:redis跳表设计与实现
    • 聊聊redis中的字典设计与实现
  • MySQL

    • MySQL基础知识点小结
    • 解读MySQL 索引基础
    • MySQL 索引进阶指南
    • 解读MySQL Explain关键字
    • 探秘 MySQL 锁:原理与实践
    • 详解MySQL重做日志redolog
    • 详解undoLog在MySQL MVCC中的运用
    • MySQL二进制日志binlog核心知识点
    • MySQL高效插入数据的最佳实践
    • MySQL分页查询优化指南
    • MySQL流式查询的奥秘与应用解析
    • 来聊聊分库分表
    • 来聊聊大厂常用的分布式ID生成方案
  • ElasticSearch

    • 从Lucene到Elasticsearch:进化之路
    • ES 基础使用指南
    • ElasticSearch如何写入一篇文档
    • 深入剖析Elasticsearch文档读取原理
    • 聊聊ElasticSearch性能调优
    • Spring借助Easy-Es操作ES
  • Netty

    • 一文快速了解高性能网络通信框架Netty
    • Netty网络传输简记
    • 来聊聊Netty的ByteBuf
    • 来聊聊Netty消息发送的那些事
    • 解密Netty高性能之谜:NioEventLoop线程池阻塞分析
    • 详解Netty中的责任链Pipeline如何管理ChannelHandler
    • Netty Reactor模型常见知识点小结
    • Netty如何驾驭TCP流式传输?粘包拆包问题全解
    • Netty解码器源码解析
  • 消息队列

    • 一文快速入门消息队列
    • 消息队列RocketMQ入门指南
    • 基于RocketMQ实现分布式事务
    • RocketMQ容器化最佳实践
    • RocketMQ常见问题与深度解析
    • Kafka快速安装与使用指南
  • Nginx

    • Linux下的nginx安装
    • Nginx基础入门总结
    • Nginx核心指令小结
    • Nginx进程结构与核心模块初探
    • Nginx应用进阶HTTP核心模块配置
    • Nginx缓存及HTTPS配置小记
    • nginx高可用实践简记
    • Nginx性能优化
  • 微服务基础

    • 微服务基础知识小结
    • 分布式事务核心概念小结
    • OpenFeign核心知识小结
    • 微服务组件Gateway核心使用小结
    • 分布式事务Seata实践
    • 用 Docker Compose 完成 Seata 的整合部署
  • Nacos

    • Nacos服务注册原理全解析
    • Nacos服务订阅流程全解析
    • Nacos服务变更推送流程全解析
    • 深入解析SpringCloud负载均衡器Loadbalancer
    • Nacos源码环境搭建与调试指南
  • Seata

    • 深度剖析Seata源码
  • Docker部署

    • 一文快速掌握docker的理念和基本使用
    • 使用docker编排容器
    • 基于docker-compose部署微服务基本环境
    • 基于docker容器化部署微服务
    • Gateway全局异常处理及请求响应监控
    • Docker图形化界面工具Portainer最佳实践
  • Go基础

    • 一文带你速通Go语言基础语法
    • 一文快速掌握Go语言切片
    • 来聊聊go语言的hashMap
    • 一文速通go语言类型系统
    • 浅谈Go语言中的面向对象
    • go语言是如何实现协程的
    • 聊聊go语言中的GMP模型
    • 极简的go语言channel入门
    • 聊聊go语言基于epoll的网络并发实现
    • 写给Java开发的Go语言协程实践
  • mini-redis实战

    • 来聊聊我用go手写redis这件事
    • mini-redis如何解析处理客户端请求
    • 实现mini-redis字符串操作
    • 硬核复刻redis底层双向链表核心实现
    • 动手复刻redis之go语言下的字典的设计与落地
    • Go 语言下的 Redis 跳表设计与实现
    • Go 语言版 Redis 有序集合指令复刻探索
  • 项目编排

    • Spring脚手架创建简记
    • Spring脚手架集成分页插件
    • Spring脚手架集成校验框架
    • maven父子模块两种搭建方式简记
    • SpringBoot+Vue3前后端快速整合入门
    • 来聊聊Java项目分层规范
  • 场景设计

    • Java实现文件分片上传
    • 基于时间缓存优化浏览器轮询阻塞问题
    • 基于EasyExcel实现高效导出
    • 10亿数据高效插入MySQL最佳方案
    • 从开源框架中学习那些实用的位运算技巧
  • CI/CD

    • 基于NETAPP实现内网穿透
    • 基于Gitee实现Jenkins自动化部署SpringBoot项目
    • Jenkins离线安装部署教程简记
    • 基于Nexus搭建Maven私服基础入门
    • 基于内网的Jenkins整合gitlab综合方案简记
  • 监控方法论

    • SpringBoot集成Prometheus与Grafana监控
    • Java监控度量Micrometer全解析
    • 从 micrometer计量器角度快速上手promQL
    • 硬核安利一个监控告警开源项目Nightingale
  • Spring AI

    • Spring AI Alibaba深度实战:一文掌握智能体开发全流程
    • Spring AI Alibaba实战:JVM监控诊断Arthas Agent的工程化构建与最佳实践
  • 大模型评测

    • M2.7 真能打!我用两个真实场景测了测,结果有点意外
    • Qoder JetBrains插件评测:祖传代码重构与接口优化实战
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

sharkchili

计算机禅修者
首页
  • Java基础

    • 聊一聊java一些核心知识点
    • 聊聊java面向对象核心知识点
    • 聊聊Java中的异常
    • 聊聊Java中的常用类String
    • 万字长文带你细聊Java注解本质
    • 来聊聊Java的反射机制
    • 深入解析Java泛型的魅力与机制
    • Java集合框架深度解析与面试指南
    • Java常用集合类HashMap深度解析
    • LinkedHashMap源码到面试题的全解析
    • 深入解析CopyOnWriteArrayList的工作机制
    • Java基础IO总结
    • Java三大IO模型小结
    • Java BIO NIO AIO详解
    • Java进阶NIO之IO多路复用详解
    • Java8流式编程入门
    • 一文速通lambda与函数式编程
    • Java8函数式方法引用最佳实践
  • Java并发编程

    • Java并发编程基础小结
    • 深入理解Java中的final关键字
    • 浅谈Java并发安全发布技术
    • 浅谈Java并发编程中断的哲学
    • Java线程池知识点小结
    • 浅谈Java线程池中拒绝策略与流控的艺术
    • synchronized关键字使用指南
    • 深入源码解析synchronized关键字
    • 详解JUC包下的锁
    • 详解并发编程中的CAS原子类
    • LongAdder源码分析
    • AQS源码解析
    • 深入剖析Java并发编程中的死锁问题
    • Java并发容器总结
    • 详解Java并发编程volatile关键字
    • 并发编程ThreadLocal必知必会
    • CompletableFuture基础实践小结
    • CompletableFuture异步多任务最佳实践
    • 硬核详解FutureTask设计与实现
    • 线程池大小设置的底层逻辑与场景化方案
    • 来聊一个有趣的限流器RateLimiter
  • JVM相关

    • 从零开始掌握 JVM
    • JVM核心知识点小结
    • JVM指令集概览:基础与应用
    • JVM类加载器深度解析
    • JVM方法区深度解析
    • Java内存模型JMM详解
    • Java对象大小的精确计算方法
    • 逃逸分析在Java中的应用与优化
    • 从零开始理解JVM的JIT编译机制
    • G1垃圾回收器:原理详解与调优指南
    • JVM故障排查实战指南
    • JVM内存问题排错最佳实践
    • JVM内存溢出排查指南
    • 简明的Arthas使用教程
    • 简明的Arthas配置及基础运维教程
    • 基于Arthas Idea的JVM故障排查与指令生成
    • 基于arthas量化监控诊断java应用方法论与实践
    • 深入剖析arthas技术原理
  • 深入理解Spring框架

    • Spring 核心知识点全面解析
    • Spring核心功能IOC详解
    • Spring AOP 深度剖析与实践
    • Spring 三级缓存机制深度解析
    • 深入 Spring 源码,剖析设计模式的落地实践
    • 探索 Spring 事务的奥秘
    • 深入解析Spring Bean的生命周期管理
    • 解读 Spring Boot 核心知识点
    • Spring Boot 启动优化实战:1分钟到13秒的排查与优化之路
    • Spring Boot自动装配原理及实践
    • 一文快速上手Sharding-JDBC
    • sharding-jdbc如何实现分页查询
    • 基于DynamicDataSource整合分库分表框架Shardingsphere
  • 计算机组成原理

    • 计算机硬件知识小结
    • CPU核心知识点小结
    • 浅谈CPU流水线的艺术
    • 从Java程序员视角聊聊CPU缓存
    • CPU任务调度和伪共享问题小结
    • CPU MESI缓存一致性协议
    • CPU内存管理机制
    • 内存深度解析
    • 磁盘存储原理
    • 详解计算机启动步骤
    • CPU南北桥架构与发展史
    • CPU中断机制与硬件交互详解
  • 操作系统

    • 如何实现一个高性能服务器
    • Linux文件结构与文件权限
    • Linux常见压缩指令小结
    • Linux核心系统调用详解
    • Linux进程管理
    • Linux线程管理
    • 进程与线程深度解析
    • Linux进程间通信机制
    • 零拷贝技术原理与实践
    • CPU缓存一致性问题深度解析
    • IO任务与CPU调度艺术
  • 计算机网络

    • 网卡通信原理详解
    • 网卡数据包处理指南
    • 基于抓包详解TCP协议
  • 编码最佳实践

    • 浅谈现代软件工程TDD最佳实践
    • 浅谈TDD模式下并发程序设计与实现
    • 面向AI编程新范式Trae后端开发环境搭建与实践
    • 基于提示词工程的Redis签到功能开发实践
    • 基于Vibe Coding的Redis分页查询实现
    • 告别AI无效对话:资深工程师的提示词设计最佳实践
  • 实用技巧与配置

    • Mac常用快捷键与效率插件指南
    • Keynote技术科普短视频制作全攻略
  • 写作

    • 写好技术博客的5大核心原则:从认知科学到AI工具的全流程指南
  • 开发工具

    • IDEA配置详解与高效使用指南
  • Nodejs
  • 博客搭建
  • Redis

    • Redis核心知识小结
    • 解锁Redis发布订阅模式
    • 掌握Redis事务
    • Redis主从复制技术
    • Redis的哨兵模式详解
    • 深度剖析Redisson分布式锁
    • 详解redis单线程设计思路
    • 来聊聊Redis所实现的Reactor模型
    • Redis RDB持久化源码深度解析
    • 来聊聊redis的AOF写入
    • 来聊聊Redis持久化AOF管道通信的设计
    • 来聊聊redis集群数据迁移
    • Redis SDS动态字符串深度解析
    • 高效索引的秘密:redis跳表设计与实现
    • 聊聊redis中的字典设计与实现
  • MySQL

    • MySQL基础知识点小结
    • 解读MySQL 索引基础
    • MySQL 索引进阶指南
    • 解读MySQL Explain关键字
    • 探秘 MySQL 锁:原理与实践
    • 详解MySQL重做日志redolog
    • 详解undoLog在MySQL MVCC中的运用
    • MySQL二进制日志binlog核心知识点
    • MySQL高效插入数据的最佳实践
    • MySQL分页查询优化指南
    • MySQL流式查询的奥秘与应用解析
    • 来聊聊分库分表
    • 来聊聊大厂常用的分布式ID生成方案
  • ElasticSearch

    • 从Lucene到Elasticsearch:进化之路
    • ES 基础使用指南
    • ElasticSearch如何写入一篇文档
    • 深入剖析Elasticsearch文档读取原理
    • 聊聊ElasticSearch性能调优
    • Spring借助Easy-Es操作ES
  • Netty

    • 一文快速了解高性能网络通信框架Netty
    • Netty网络传输简记
    • 来聊聊Netty的ByteBuf
    • 来聊聊Netty消息发送的那些事
    • 解密Netty高性能之谜:NioEventLoop线程池阻塞分析
    • 详解Netty中的责任链Pipeline如何管理ChannelHandler
    • Netty Reactor模型常见知识点小结
    • Netty如何驾驭TCP流式传输?粘包拆包问题全解
    • Netty解码器源码解析
  • 消息队列

    • 一文快速入门消息队列
    • 消息队列RocketMQ入门指南
    • 基于RocketMQ实现分布式事务
    • RocketMQ容器化最佳实践
    • RocketMQ常见问题与深度解析
    • Kafka快速安装与使用指南
  • Nginx

    • Linux下的nginx安装
    • Nginx基础入门总结
    • Nginx核心指令小结
    • Nginx进程结构与核心模块初探
    • Nginx应用进阶HTTP核心模块配置
    • Nginx缓存及HTTPS配置小记
    • nginx高可用实践简记
    • Nginx性能优化
  • 微服务基础

    • 微服务基础知识小结
    • 分布式事务核心概念小结
    • OpenFeign核心知识小结
    • 微服务组件Gateway核心使用小结
    • 分布式事务Seata实践
    • 用 Docker Compose 完成 Seata 的整合部署
  • Nacos

    • Nacos服务注册原理全解析
    • Nacos服务订阅流程全解析
    • Nacos服务变更推送流程全解析
    • 深入解析SpringCloud负载均衡器Loadbalancer
    • Nacos源码环境搭建与调试指南
  • Seata

    • 深度剖析Seata源码
  • Docker部署

    • 一文快速掌握docker的理念和基本使用
    • 使用docker编排容器
    • 基于docker-compose部署微服务基本环境
    • 基于docker容器化部署微服务
    • Gateway全局异常处理及请求响应监控
    • Docker图形化界面工具Portainer最佳实践
  • Go基础

    • 一文带你速通Go语言基础语法
    • 一文快速掌握Go语言切片
    • 来聊聊go语言的hashMap
    • 一文速通go语言类型系统
    • 浅谈Go语言中的面向对象
    • go语言是如何实现协程的
    • 聊聊go语言中的GMP模型
    • 极简的go语言channel入门
    • 聊聊go语言基于epoll的网络并发实现
    • 写给Java开发的Go语言协程实践
  • mini-redis实战

    • 来聊聊我用go手写redis这件事
    • mini-redis如何解析处理客户端请求
    • 实现mini-redis字符串操作
    • 硬核复刻redis底层双向链表核心实现
    • 动手复刻redis之go语言下的字典的设计与落地
    • Go 语言下的 Redis 跳表设计与实现
    • Go 语言版 Redis 有序集合指令复刻探索
  • 项目编排

    • Spring脚手架创建简记
    • Spring脚手架集成分页插件
    • Spring脚手架集成校验框架
    • maven父子模块两种搭建方式简记
    • SpringBoot+Vue3前后端快速整合入门
    • 来聊聊Java项目分层规范
  • 场景设计

    • Java实现文件分片上传
    • 基于时间缓存优化浏览器轮询阻塞问题
    • 基于EasyExcel实现高效导出
    • 10亿数据高效插入MySQL最佳方案
    • 从开源框架中学习那些实用的位运算技巧
  • CI/CD

    • 基于NETAPP实现内网穿透
    • 基于Gitee实现Jenkins自动化部署SpringBoot项目
    • Jenkins离线安装部署教程简记
    • 基于Nexus搭建Maven私服基础入门
    • 基于内网的Jenkins整合gitlab综合方案简记
  • 监控方法论

    • SpringBoot集成Prometheus与Grafana监控
    • Java监控度量Micrometer全解析
    • 从 micrometer计量器角度快速上手promQL
    • 硬核安利一个监控告警开源项目Nightingale
  • Spring AI

    • Spring AI Alibaba深度实战:一文掌握智能体开发全流程
    • Spring AI Alibaba实战:JVM监控诊断Arthas Agent的工程化构建与最佳实践
  • 大模型评测

    • M2.7 真能打!我用两个真实场景测了测,结果有点意外
    • Qoder JetBrains插件评测:祖传代码重构与接口优化实战
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Netty

  • 消息队列

    • Kafka

    • RocketMQ

      • 一文快速入门消息队列
      • 消息队列RocketMQ入门指南
      • 深度解析:基于 RocketMQ 实现分布式事务的技术实践与原理探究
      • RocketMQ容器化最佳实践
      • RocketMQ常见问题与深度解析
        • 写在文章开头
        • RocketMQ基础知识点
          • RocketMQ的架构是怎么样的(重要)
          • nameServer是如何发现Broker的?Broker如何注册到NameServer?
          • NameServer故障下线后,broker如何完成切换
          • nameServer是如何保证高可用?如果所有nameServer都下线会怎样?
          • 如何优化NameServer的可用性和性能?
          • broker是如何存储消息的
          • 消费者拉取消息时,broker还未刷盘,消息能否拉取到
          • 消息同步复制和异步复制的区别
          • 为什么RocketMQ吞吐量比RabbitMQ高?
        • RocketMQ进阶知识点
          • RocketMQ的事务消息是如何实现的
          • RocketMQ如何保证消息的顺序性
          • RocketMQ有几种集群方式
          • RocketMQ消息堆积了怎么解决
          • RocketMQ的工作流程详解
          • RocketMQ的消息是采用推模式还是采用拉模式
          • 用了RocketMQ一定能做到削峰吗
          • 常见消息队列的消息模型有哪些?RocketMQ用的是那种消息模型
          • RocketMQ消息的几种消费模式
          • 如何保证消息可用性和可靠性呢?
          • 如果避免消息重复消费问题(重点)
          • 延时消息底层是怎么实现
          • 什么是死信队列
          • Broker是进行消息持久化的
          • RocketMQ如何进行文件读写的呢?
          • 消息刷盘如何实现呢?
          • RocketMQ负载均衡
          • RocketMQ消息长轮询
          • 为什么Consumer要集群部署?可以部署多少个?
          • 生产环境用的是同步复制还是异步复制?同步刷盘还是异步刷盘?
          • 如何增加Topic的队列数?增加队列数会有什么影响?
        • 小结
        • 参考
  • Nginx

  • 中间件
  • 消息队列
  • RocketMQ
sharkchili
2026-03-25
目录

RocketMQ常见问题与深度解析

@[toc]

# 写在文章开头

在当今的分布式系统架构中,消息队列作为解耦、异步处理和流量削峰的核心组件,扮演着至关重要的角色。而RocketMQ作为阿里巴巴开源的一款高性能、高可用的分布式消息中间件,凭借其强大的功能和广泛的应用场景,成为众多企业和开发者关注的焦点。

无论是面试中还是实际工作中,掌握RocketMQ的核心原理和常见问题都显得尤为重要。本文将从基础到高级,全面总结RocketMQ的面试高频问题,并结合实际应用场景和源码解析,帮助读者深入理解其设计思想和实现机制。无论你是准备面试的技术人,还是希望提升对RocketMQ理解的开发者,相信这篇文章都能为你提供有价值的参考。

接下来,我们将从RocketMQ的基本概念、核心组件、消息存储与消费、高可用性设计以及性能优化等方面展开详细解析,助你轻松应对面试中的各种挑战!

Hi,我是 sharkChili ,是个不断在硬核技术上作死的技术人,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。

同时也非常欢迎你star我的开源项目mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

# RocketMQ基础知识点

# RocketMQ的架构是怎么样的(重要)

整体来说,RocketMQ是由如下几个部分组成:

  1. nameserver:提供服务发现和路由功能,负责维护broker的数据信息,包括broker地址、topic和queue等,对应的producer和consumer在启动时都需要通过nameserver获取broker和topic映射关系。
  2. broker:可以理解为消息中转服务器,主要负责消息存储和传输,即持久化生产者的消息以及处理消费者拉取请求将数据投递给消费者。RocketMQ支持多个broker构成集群,每个broker都有独立的存储空间和队列。
  3. producer:投递消息的生产者,通过nameserver找到主要负责的broker,并将消息投递到对应的队列上。
  4. consumer:订阅topic并从对应broker上拉取消息并消费。

初次之外还有一个关于消息的概念,即topic也就是消费主题,对于消息的逻辑分类的单位,producer消息都会发送到特定的topic上,对应的consumer就会从这些topic拿到消费的消息。

更多关于消息队列架构的,感兴趣的读者可以参考笔者这篇文章:

一文快速了解消息中间件:https://mp.weixin.qq.com/s/6vI05sZD_KOaj1YnyK67MA (opens new window)

# nameServer是如何发现Broker的?Broker如何注册到NameServer?

broker在启动阶段会向所有的nameServer发送服务注册申请,而nameServer由此就会得到broker基本进行并建立通信。后续broker也会定期向nameServer发送心跳包保持两者之间的长连接。

当然,如果broker宕机,nameServer也会将这个broker从自己的路由表中删除。

# NameServer故障下线后,broker如何完成切换

集群模式下,某个nameServer下线了,其他的nameserver还是可以对外提供服务的。 针对broker来说,上文提到broker会定期发送心跳包给nameServer进行心跳保活,一旦nameServer长时间没有收到broker的心跳包就会判定其下线。默认情况下,broker配置了DLedger,在进行故障切换时,它就会基于raft协议选一个新的broker作为master向nameserver注册。

# nameServer是如何保证高可用?如果所有nameServer都下线会怎样?

因为nameServer本身是无状态的,所以通过集群模式即可保证nameServer的高可用,如果nameServer所有集群都挂了可能存在如下风险:

  1. 如果producer和consumer没有缓存broker时,所有NameServer都挂了,那么整个生产者和消费者都无法完成消费投递和消费。
  2. 如果producer和consumer在NameServer下线前缓存了broker的信息,在基本架构没有改变的情况下可以正常服务,但是一旦broker信息改变了,那么生产者和消费者服务也是会瘫痪的。

所以这种问题我们优先从以下几个角度保证:

  1. 尽可能3台及以上的nameServer。
  2. 增加监控工具监控nameServer信息,感知下线时直接触发告警。

# 如何优化NameServer的可用性和性能?

保证可用性就是在集群部署架构和监控进行相应保证即可,这一点在上文中我们就已经提及。

性能发面我们大体可以从以下几个角度进行优化:

  1. 默认情况下,建议至少搭建3台的nameserver。
  2. 采用负载均衡机制将请求分散到不同的nameserver上。
  3. 适当延长生产者和消费者拉取broker信息的频率,减小nameserver的处理网络IO请求的压力。
  4. 实时监控nameserver的鉴康状态,设置告警规则,及时发现和处理异常。

# broker是如何存储消息的

我们都知道rocketmq进行消息持久化操作时基本是采用零拷贝技术,所以,当broker收到生产者消息投递请求时,它会进行如下的处理步骤:

  1. 它会通过mmap技术将数据写入到commitLog文件对应的内核缓冲区中,等异步刷盘到磁盘中。
  2. broker会为每个topic创建一个consumeQueue,所以上述操作完成后,broker还会记录本条消息在commitLog中的偏移量、大小、tag等信息,到consumerQueue/topic名称下的某个物理文件中。

# 消费者拉取消息时,broker还未刷盘,消息能否拉取到

实际上即使broker没有刷盘,consumer依然是可以拉取到这个消息的,原因很简单,MQ收到消息后会直接通过mmap技术将消息写入内核缓冲区中,这并不意味着消息无法被消费者拉取到:

刷盘机制更多是用于保证消息持久化保证MQ因为某些原因宕机重启后依然可以定位到这些消息对外提供服务,而非是没有持久化的消息无法被broker拉取并转发给consumer。

# 消息同步复制和异步复制的区别

该问题前提条件是MQ属于集群架构,对于同步模式而言,它要求broker收到消息投递的请求后,必须同步发送给slave节点,只有slave回复ack确认之后才能告知生产者本地消息投递完成:

而异步复制则是消息投递到master节点之后,无需同步到从节点即可告知生产者消息投递成功,让异步线程在后台自行完成消息同步工作。

总的来说,同步复制在网络延迟较高的情况下,这种同步的RT耗时会导致吞吐量下降,但是可以保证消息可靠性。而后者反之,保证了性能,却无法保证消息可靠性。

# 为什么RocketMQ吞吐量比RabbitMQ高?

最根本的原因就是在于RocketMQ进行消息存储时,用到了零拷贝技术和顺序刷盘技术:

  1. 通过零拷贝的mmap技术保证消息写入时避免用户态和内核态复制的开销,提升消息传输效率。
  2. 通过顺序刷盘技术提升的数据持久化到物理磁盘的效率。

# RocketMQ进阶知识点

# RocketMQ的事务消息是如何实现的

大体是通过half消息完成,大体工作流程为:

  1. 生产者即应用程序像mq的broker发送一条half消息,mq收到该消息后在事务消息日志中将其标记为prepared状态,然后回复ack确认。
  2. 生产者执行本地事务,将事务执行结果发送提交指令告知mq可以提交事务消息给消费者。
  3. mq若收到提交通知后,将消息从prepared改为commited,然后将消息提交给消费者,当然如果mq长时间没有收到提交通知则发送回查给生产者询问该事务的执行情况。
  4. 基于事务结果若成功则将事务消息提交给消费者,反之回滚该消息即将消息设置为rollback并将该消息从消息日志中删除,从而保证消息不被消费。

# RocketMQ如何保证消息的顺序性

针对保证消费顺序性的问题,我们可以基于下面这样的一个场景来分析,假设我们有一个下单请求,生产者要求按需投递下面这些消息让消费者消费:

  1. 创建订单
  2. 用户付款
  3. 库存扣减

同理消费者也得严格按照这个顺序完成消费,此时如果按照简单维度的架构来说,我们可以全局设置一个topic让生产者准确有序的投递每一个消息,然后消费者准确依次消费消息即可,但是这样做对于并发的场景下性能表现就会非常差劲:

为了适当提升两端性能比对消息堆积,我们选择增加队列用多个队列处理这个原子业务:

有了这样的架构基础,我们就需要考虑生产者和消费者的有序生产和有序消费的落地思路了,先来说说生产者有序投递,这样做比较简单,我们可以直接通过订单号进行hash并结合topic队列数进行取模的方式将一个订单的创建、余额扣减、库存扣减的消息有序投递到某个队列中,这样就能保证单个订单的业务消息有序性:

对应的我们也给出生产者的代码使用示例:

//基于订单号orderNo进行哈希取模发送订单消息
Message<Order> message = MessageBuilder.withPayload(order).build();
rocketMQTemplate.syncSendOrderly("ORDER_ADD", message, order.getOrderNo());
1
2
3

这块哈希取模的实现可以从底层源码DefaultMQProducerImpl的sendSelectImpl看到,它会将arg(也就是我们的orderNo)通过selector的select进行运算获得单topic下的某个队列:

private SendResult sendSelectImpl(
        Message msg,
        MessageQueueSelector selector,
        Object arg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
      //......
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            try {
            	//传入arg也就是我们的orderNo基于selector算法进行哈希取模
                mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
            } catch (Throwable e) {
               //......
            }

          //......
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

这个调用会来到SelectMessageQueueByHash的select,从源码可以看出这块代码看出,它的算法就是通过参数哈希运算后结合队列数(默认为4)进行取模:

public class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    //数值哈希运算
        int value = arg.hashCode();
     	//......
     	//结合队列数取模得到队列返回
        value = value % mqs.size();
        return mqs.get(value);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

消费者端就比较简单了,consumeMode指定为有序消费即可:

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}",
        topic = "ORDER_ADD",
        consumeMode = ConsumeMode.ORDERLY//同一个topic下同一个队列只有一个消费者线程消费
        
)
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        log.info("收到订单,订单信息:[{}],进行积分系统、促销系统、推送系统业务处理.....", JSONUtil.toJsonStr(order));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

这里我们也基于源码解析一下这个有序消费的实现,本质上消费者启动的时候会开启一个定时任务尝试获取分布式上锁队列信息的执行如下步骤:

  1. 获取知道的broker及其队列。
  2. 获取对应broker的master地址。
  3. 发送请求到服务端询问master获取所有队列的分布式锁。
  4. 基于请求结果获取查看那些队列上锁成功。
  5. 更新本地结果。

完成后,消费者就拉取到全局可唯一消费的队列信息,因为每个消费者都是基于多线程执行,所以为了保证本地多线程消费有序性,每一个线程进行消费时都会以消息队列messageQueue作为key用synchronized上锁后才能消费。

代码如下所示,可以看到上锁成功后就会执行messageListener.consumeMessage方法,该方法就会走到我们上文中声明的监听上了:

public void run() {
            //......
            //消费请求线程消费前会获取消息队列锁
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            //上锁
            synchronized (objLock) {
                 //......
                                //将消息发送给实现有序监听的监听器线程
                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {
                                 //......
                            } finally {
                                 //......
                            }
  //......
        }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# RocketMQ有几种集群方式

  1. 多master模式:该模式仅由多master构成,配置比较简单,单个master宕机或重启对于应用全局没有任何影响,尤其在磁盘为RAID10的情况下,即使服务器不可恢复,只要我们使用同步刷盘策略,基本上消息都不会丢失,而缺点也非常明显,单台机器宕机期间,这台机器未被消费的消息在恢复之前不可订阅,消息的实时性会受影响。
  2. 多master-多slave异步复制:多个master和多个slave构成,即使master宕机后slave依然可以对外提供服务,所以消息实时性不会受影响,缺点是主从复制是异步的,如果master宕机时同步的消息可能丢失部分,且没有解决slave自动切换为master。
  3. 多master-多slave同步复制:上者的优化版即同步策略采用同步的方式,保证在RAID10的情况下消息基本不丢失,但因采用的是同步复制,所以发送单个消息的RT可能略高,且同样没有解决slave自动切换为master。
  4. Dledger模式:该集群模式要求至少由3个broker构成,即一个master必须对应两个slave,一旦某个master宕机后通过raft一致性算法选举新的master对外提供服务。具体实践可以参考:https://rocketmq.apache.org/zh/docs/bestPractice/02dledger/ (opens new window)

# RocketMQ消息堆积了怎么解决

消息堆积的原因比较多,大体是客户端队列并发度不够或者客户端消费能力不足,所以我们可以针对以下几个角度进行针对性的优化:

  1. 增加消费者实例:如果是消费速度过慢导致的消息堆积,则建议增加消费者数量,让更多的实例来消费这些消息。
  2. 提升消费者消费速度:如果是消息消费处理耗时长,则针对性的业务流程调优,例如引入线程池、本地消息存储后立即返回成功等方式提前消息进行批量的预消化。
  3. 降低生产者速度:如果生产端可控且消费者已经没有调优的空间时,我们建议降低生产者生产速度。
  4. 清理过期消息:对于一些过期且一直无法处理成功的消息,在进行业务上的评估后,我们可以针对性的进行清理。
  5. 增加topic队列数:如果是因为队列少导致并发度不够可以考虑增加一下消费者队列,来提升消息队列的并发度。
  6. 参数调优:我们可以针对各个节点耗时针对:消费模式、消息拉取间隔等参数进行优化。

# RocketMQ的工作流程详解

上文已经介绍了几个基本的概念,我们这里直接将其串联起来:

  1. 启动nameServer,等待broker、producer和consumer的接入。
  2. 启动broker和nameserver建立连接,并定时发送心跳包,心跳包中包含broker信息(ip、端口号等)以及topic以及broker与topic的映射关系。
  3. 启动producer,producer启动时会随机通过nameserver集群中的一台建立长连接,并从中获取发送的topic和所有broker地址信息,基于这些信息拿到topic对应的队列,与队列所在的broker建立长连接,自此开始消息发送。
  4. broker接收producer发送的消息时,会根据配置同步和刷盘策略进行状态回复:

1. 若为同步复制则master需要复制到slave节点后才能返回写状态成功
2. 若配置同步刷盘,还需要基于上述步骤再将数据写入磁盘才能返回写成功
3. 若是异步刷盘和异步复制,则消息一到master就直接回复成功
1
2
3
4
  1. 启动consumer,和nameserver建立连接然后订阅信息,然后对感兴趣的broker建立连接,获取消息并消费。

# RocketMQ的消息是采用推模式还是采用拉模式

消费模式分为3种:

  1. push:服务端主动推送消息给客户端。
  2. pull:客户端主动到服务端轮询获取数据。
  3. pop:5.0之后的新模式,后文会介绍。

总的来说push模式是性能比较好,但是客户端没有做好留空,可能会出现大量消息把客户端打死的情况。 而poll模式同理,频繁拉取服务端可能会造成服务器压力,若设置不好轮询间隔,可能也会出现消费不及时的情况,

整体来说RocketMQ本质上还是采用pull模式,具体后文会有介绍。

# 用了RocketMQ一定能做到削峰吗

削峰本质就是将高并发场景下短时间涌入的消息平摊通过消息队列构成缓冲区然后平摊到各个时间点进行消费,从而实现平滑处理。

这也不意味着用mq就一定可以解决问题,假如用push模式,这就意味着你的消息都是mq立即收到立即推送的,本质上只是加了一个无脑转发的中间层,并没有实际解决问题。 所以要想做到削峰,就必须用拉模式,通过主动拉去保证消费的速度,让消息堆积在mq队列中作为缓冲。

# 常见消息队列的消息模型有哪些?RocketMQ用的是那种消息模型

消息队列的消息模型有两种,一种是队列模型(点对点模型),生产者负责把消息投递到消息队列中,消费者去消息队列中抢消息,消息只能被一个消费者消费,先到者先得:

还有一种就是发布/订阅模型了,发布订阅模型的消息只要消费者有订阅对应的topic主题,就都能够消费这份消息,而RocketMQ就是典型的发布订阅模型:

我们简单介绍一下rocketMQ中group的概念,从5.X之后的版本开始生产者组已经是匿名的,这也就意味着我们已经无需管理这个部分,对于消费者而言,MQ的消费者都是以消费者组为单位进行消息消费,以RocketMQ的架构为例,它按照topic划分业务层面的消息例如:体育新闻topic和娱乐新闻topic,以下图为例,它通过2个队列queue来维护test-topic中的消息,通过这个架构也不难看出,它只能保证队列维度的消费有序,不能保证topic维度的消费有序:

如下所示,笔者又新建一个监听者,用的是不同的消费者组consumerGroup,运行时即可看到两组订阅的消费者消费一份消息:

@Component
@RocketMQMessageListener(consumerGroup = "gourp2", topic = "ORDER_ADD")
public class OrderMqListener2 implements RocketMQListener<Order> {

    private static Logger logger = LoggerFactory.getLogger(OrderMqListener2.class);


    @Override
    public void onMessage(Order order) {

        logger.info("订阅者2收到消息,订单信息:[{}],进行新春福利活动.....", JSON.toJSONString(order));

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# RocketMQ消息的几种消费模式

有两种消费模式:

  1. 集群消费:这种是RocketMQ默认模式,一个主题下的多个队列都会被消费者组中的某个消费者消费掉。
  2. 广播消费:广播消费模式会让每个消费者组中的每个消费者都能使用这个消息。

# 如何保证消息可用性和可靠性呢?

这个问题我们要从3个角度考虑:

对于生产阶段,生产者发送消息要想确保可靠必须遵循以下3点:

  1. 没有发送成功就需要进行重试:
		SendResult result = producer.send(message);
                if (!"SEND_OK".equals(result.getSendStatus().name())){
                    logger.warn("消息发送失败,执行重试的逻辑");
                }
1
2
3
4
  1. 如果发送超时,我们可以从日志相关API中查看是否存到Broker中。
  2. 如果是异步消息,则需要到回调接口中做相应处理。

针对存储阶段,存储阶段要保证可靠性就需要从以下几个角度保证:

  1. 开启主从复制模式,使得Master挂了还有Slave可以用。
  2. 为了确保发送期间服务器宕机的情况,我们建议刷盘机制改为同步刷盘,确保消息发送并写到CommitLog中再返回成功。

这里补充一下同步刷盘和异步刷盘的区别:

  1. 同步刷盘,生产者投递的消息持久化时必须真正写到磁盘才会返回成功,可靠性高,但是因为IO问题会使得组件处理效率下降。
  2. 异步刷盘,如下图所示,可以仅仅是存到page cache即可返回成功,至于何时持久化操磁盘由操作系统后台异步的页缓存置换算法决定。

对应的刷盘策略,我们只需修改broker.conf的配置文件即可:

#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
1
2
3
4

对于消费阶段,消费者编码逻辑一定要确保消费成功了再返回消费成功:

consumer.registerMessageListener((List<MessageExt> msgs,
                                          ConsumeConcurrentlyContext context) -> {
            String msg = new String(msgs.stream().findFirst().get().getBody());
            logger.info("消费收到消息,消息内容={}", msg);
            
            //消费完全成功再返回成功状态
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
1
2
3
4
5
6
7
8

# 如果避免消息重复消费问题(重点)

导致消息消费重复的原因大体有如下3个:

  1. 生产者提交成功的消息,因为网络波动没收到正确响应进而重新提交。
  2. 消费者完成消费后因为某些原因没有更新offset。
  3. rebanlance机制重平衡导致消息被重复拉取消费。

有些场景下,我们只需保证业务幂等即可,例如:我们需要给订单服务发送一个用户下单成功的消息,无论发送多少次订单服务只是将订单表状态设置成已完成,针对这种场景,我们只要在功能设计上保证幂等即可:

还有一种方式就是业务去重,例如我们现在要创建订单,每次订单创建完都会往一张以消息的msg_id为主键的表中记录消费信息表中插入数据,利用主键冲突来避免重复消费:

最后一种方案和上述消息表差不多,即通过redis的setNx执行避免重复消费,这里就不多做重复了。

# 延时消息底层是怎么实现

我们都知道投递消息到消息队列的时候,消息都会写入到commitLog上,在此之前MQ会检查当前消息延迟等级是否大于0,如果是则说明该消息是延迟消息,则会将其topic设置为RMQ_SYS_SCHEDULE_TOPIC并基于延迟等级获取对应的队列,最后基于零拷贝的方式写入磁盘,注意此时消息还不可被消费:

对此我们也给出这段投递消息的源码,即位于CommitLog的asyncPutMessage异步投递消息的方法:

 public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
       //......

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
           
            //如果大于0则说明是延迟消息
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                //设置topic设置为SCHEDULE_TOPIC_XXXX
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                //基于等级获取延迟消息队列
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

              	//......
                //基于上述设置topic和队列信息
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

     		  //......
            //基于零拷贝的方式添加
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            //......
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

MQ启动的时候底层的消息调度服务会基于延迟消息的等级初始化几个任务,这些任务会基于定时的间隔检查是否有到期的消息到来,如果到期则将其投递到真正topic的队列中供消费者消费:

基于此逻辑我们也给出ScheduleMessageService的start方法查看调度器的初始化逻辑,可以看到初始化阶段,它会遍历所有延迟级别并为其初始化一个定时任务:

public void start() {
        if (started.compareAndSet(false, true)) {
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            //遍历所有延迟级别
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) {
                    offset = 0L;
                }
                //设置timer定时器
                if (timeDelay != null) {
                    //投递给定时器对应等级的定时任务
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }
          //......
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

查看DeliverDelayedMessageTimerTask的核心逻辑即run方法,也就是我们所说的定时检查是否有到期消息,若存在则将其存入原本的topic上,消费者就可以消费了:

@Override
        public void run() {
            try {
                if (isStarted()) {
                    this.executeOnTimeup();
                }
            } catch (Exception e) {
              //......
            }
        }


public void executeOnTimeup() {

            //基于topic和队列id获取延迟队列
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

            long failScheduleOffset = offset;

            if (cq != null) {
                //根据偏移量获取有效消息
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                      //......
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();

                              //......

                            long now = System.currentTimeMillis();
                            //计算可消费时间
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                            long countdown = deliverTimestamp - now;
                            //如果小于0说明可消费
                            if (countdown <= 0) {
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);

                                if (msgExt != null) {
                                    try {
                                        //清除延迟级别恢复到真正的topic和队列id
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                      //......
                                        //放到消息队列上
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);

                                         //......
                                        } else {
                                           //......
                                    } catch (Exception e) {
                                       //......
                               
                            } else {
                                //......
                            }
                        } // end of for

                         //......
        }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

# 什么是死信队列

通俗来说一个消息消费失败并重试达到最大次数后,MQ就会将其放到死信队列中。超过三天该消息就会被销毁。 需要补充的时死信队列是针对一个group id为单位创建的队列,如果一个gourp中都没有死信的话,那么MQ就不会为这个组创建死信队列。

# Broker是进行消息持久化的

要想了解Broker如何保存数据,我们必须了解RocketMQ三大文件:

首先是commitlog,producer发送的消息最终都会通过刷盘机制存到commitlog文件夹下。commitlog下一个文件名为00000000000000000000一旦写满,就会再创建一个文件写,一般来说第二个文件名为00000000001073741824,名称即是第一个文件的字节数。文件大小一般是1G:

然后是consumequeue文件夹,这个文件夹下记录的都是commitlog中每个topic下的队列信息物理偏移量、消息大小、hashCode值,如下图,consumequeue文件夹下会为每个topic创建一个文件夹:

打开任意一个文件夹就会看到这样一个名为00000000000000000000的文件:

而这个文件内部最多维护30w个条目,注意文件中每个条目大约20字节,8字节代表当前消息在commitLog中的偏移量,4字节存放消息大小,8字节存放tag和hashCode的值。

最后就算index,维护消息的索引,基于HashMap结构,这个文件使得我们可以通过key或者时间区间查询消息:

文件名基本用时间戳生成的,大小一般为400M,差不多维护2000w个索引:

简单小结一下RocketMQ持久化的物理文件:MQ会为每个broker维护一个commitlog,一旦文件存放到commitlog,消息就不会丢失。当无法拉取消息时,broker允许producer在30s内发送一个消息,然后直接给消费者消费。

后两个索引文件的维护是基于一个线程ReputMessageService进行异步维护consumeQueue(逻辑消费队列)和IndexFile(索引文件)数据:

# RocketMQ如何进行文件读写的呢?

对于读写IO处理有以下两种

  1. pageCache:在RocketMQ中,ConsumeQueue存储数据较少,并且是顺序读取,在pageCache预读的机制下读取速率是非常客观的(即使有大量的消息堆积)。操作系统会将一部分内存用作pageCache,当数据写入磁盘会先经过pageCache然后通过内核线程pdflush写入物理磁盘。 针对ConsumeQueue下关于消息索引的数据查询时,会先去pageCache查询是否有数据,若有则直接返回。若没有则去ConsumeQueue文件中读取需要的数据以及这个数据附近的数据一起加载到pageCache中,这样后续的读取就是走缓存,效率自然上去了,这种磁盘预读目标数据的附近数据就是我们常说的局部性原理。而commitLog随机性比较强特定情况下读写性能会相对差一些,所以在操作系统层面IO读写调度算法可以改为deadline并选用SSD盘以保证操作系统在指定时间完成数据读写保证性能。

  2. 零拷贝技术:这是MQ基于NIO的FileChannel模型的一种直接将物理文件映射到用户态内存地址的一种技术,通过MappedByteBuffer,它的工作机制是直接建立内存映射,文件数据并没有经过JVM和操作系统直接复制的过程,相当于直接操作内存,所以效率就非常高。可以参考: 能不能给我讲讲零拷贝:https://mp.weixin.qq.com/s/zS2n2a4h3YQifBYKFgcUCA (opens new window)

# 消息刷盘如何实现呢?

两种方式分别是同步刷盘和异步刷盘

  1. 同步刷盘: producer发送的消息经过broker后必须写入到物理磁盘commitLog后才会返回成功。
  2. 异步刷盘:producer发送的消息到达broker之后,直接返回成功,刷盘的逻辑交给一个异步线程实现。

而上面说的刷盘都是通过MappedByteBuffer.force() 这个方法完成的,需要补充异步刷盘是通过一个异步线程FlushCommitLogService实现的,其底层通过MappedFileQueue针对内存中的队列消息调用flush进行刷盘从而完成消息写入:

public boolean flush(final int flushLeastPages) {
        boolean result = true;
        //拉取文件处理偏移量信息
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            //基于mmap零拷贝技术进行刷盘
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            //如果刷盘后的进度和预期一样说明刷盘成功
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                //维护处理时间
                this.storeTimestamp = tmpTimeStamp;
            }
        }

        return result;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# RocketMQ负载均衡

MQ中负载均衡的主要是体现在生产者端和消费者端,Producer负载均衡算法在上述中有序消费中的源码已经说明,这里就不多做赘述,本质上就是通过底层的selector进行轮询投递:

  Message<Order> message = MessageBuilder.withPayload(order).build();
        rocketMQTemplate.syncSendOrderly("ORDER_ADD", message, order.getOrderNo());
1
2

再来consumer负载均衡算法,mq客户端启动时会开启一个负载均衡服务执行负载均衡队列轮询逻辑,通过负载均衡算法得出每个消费者应该处理的队列信息后生产拉取消息的请求,交由有MQ客户端去拉取消息:

默认情况下,负载均衡算法选定队列后拉取消息进行消费,默认情况下它会根据队列数和消费者数决定如何进行负载分担,按照平均算法:

  1. 如果消费者数大于队列数,则将队列分配给有限的几个消费者。
  2. 如果消费者数小于队列数,默认情况下会按照队列数/消费者数取下限+1进行分配,例如队列为4,消费者为3,那么每个消费者就会拿到2个队列,其中第三个消费者则没有处理任何数据。

对应的我们给出MQ客户端初始化的代码RebalanceService的run方法,可以看到它会调用mqClientFactory执行负载均衡方法doRebalance:

@Override
    public void run() {
     //.......

        while (!this.isStopped()) {
           //.......
            //客户端执行负载均衡
            this.mqClientFactory.doRebalance();
        }

       //.......
    }
1
2
3
4
5
6
7
8
9
10
11
12

步入其内部逻辑会走到RebalanceImpl的doRebalance,它遍历每个topic进行负载均衡运算:

public void doRebalance(final boolean isOrder) {
       //......
        if (subTable != null) {
            //遍历每个topic
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    //计算该topic中当前消费者要处理的队列
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                   //......
                }
            }
        }

       //......
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

最终我们来到了核心逻辑rebalanceByTopic方法,可以看到它会基于我们查到的topic的队列和消费者通过策略模式找到对应的消息分配策略AllocateMessageQueueStrategy从而算得当前消费者需要处理的队列,然后在基于这份结果调用updateProcessQueueTableInRebalance生成pullRequest找服务端拉取消息:

private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
               //......
            }
            case CLUSTERING: {
                //根据主题获取消息队列
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                //根据 topic 与 consumerGroup 获取所有的 consumerId
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                //......

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
                    //// 排序后才能保证消费者负载策略相对稳定
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        //按负载策略进行分配,返回当前消费者实际订阅的messageQueue集合
                        allocateResult = strategy.allocate(
                                this.consumerGroup,
                                this.mQClientFactory.getClientId(),
                                mqAll,
                                cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                                e);
                        return;
                    }

                  //......

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    //......
                break;
            }
            default:
                break;
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

对应的我们也给出负载均衡算法AllocateMessageQueueAveragely的源码,大体算法和笔者上述说明的基本一致,读者可以参考上图讲解了解一下:

public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
       //......
        //获取消费者id对应的索引
        int index = cidAll.indexOf(currentCID);
        //基于队列总数和客户端总数进行取模
        int mod = mqAll.size() % cidAll.size();
        /**
         *计算每个消费者的可消费的平均值:
         * 1. 如果消费者多于队列就取1
         * 2. 如果消费者少于队列就按照取模结果来计算
         */
        
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        //基于当前客户端的索引定位其处理的队列位置
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        //获取消费者的队列消费范围
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        //遍历队列存入结果集
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

完成后基于这份结果生成pullRequest存入pullRequestQueue中:

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
                                                       final boolean isOrder) {
       //......

        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
              		 //......
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                     //......
                    } else {
                    //生成pullRequest
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }
		//存入pullRequestQueue中
        this.dispatchPullRequest(pullRequestList);

        return changed;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

最后消费者的PullMessageService这个线程就会从队列中取出该请求向MQ服务端发起消息拉取请求:

@Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                //获取一个拉消息的请求pullRequest 
                PullRequest pullRequest = this.pullRequestQueue.take();
                //拉消息
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# RocketMQ消息长轮询

消费者获取消息大体有两种方案:

  1. 消息队列主动push:由消息队列主动去推送消息给消费者,高并发场景下,对于服务端性能开销略大。
  2. 消费者定期pull消息:由客户端主动去拉取消息,但是需要客户端设置好拉取的间隔,太频繁对于消息队列开销还是很大,间隔太长消息实时性又无法保证。

对此RocketMQ采用长轮询机制保证了实时性同时又降低了服务端的开销,总的来说,它的整体思路为:

  1. 消费者发起一个消费请求,内容传入topic、queueId和客户端socket、pullFromThisOffset等数据。
  2. 服务端收到请求后查看该队列是否有数据,若没有则挂起。
  3. 在一个最大超时时间内定时轮询,如果有则将结果返回给客户端。
  4. 反之处理超时,也直接告知客户端超时了也没有消息。

对应的我们再次给出消费者拉取消费的源码PullMessageService的run方法,可以看到其内部不断从阻塞队列中拉取请求并发起消息拉取:

 @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                //获取一个拉消息的请求
                PullRequest pullRequest = this.pullRequestQueue.take();
                //拉消息
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

服务端的PullMessageProcessor的processRequest就是处理请求的入口,可以看到该方法如果发现broker没有看到新的消息就会调用suspendPullRequest将客户端连接hold住:

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
        throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
        		//.......

            switch (response.getCode()) {
                case ResponseCode.SUCCESS:
					//.......
                    } else {
                       	//.......
                    break;
                case ResponseCode.PULL_NOT_FOUND://没拉取到消息

                    if (brokerAllowSuspend && hasSuspendFlag) {
                        long pollingTimeMills = suspendTimeoutMillisLong;
                        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                        }
                        //定位请求的topic以及offset和队列id
                        String topic = requestHeader.getTopic();
                        long offset = requestHeader.getQueueOffset();
                        int queueId = requestHeader.getQueueId();
                        //基于上述数据生成pullRequest并调用suspendPullRequest将其hold住
                        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                        response = null;
                        break;
                    }

                case ResponseCode.PULL_RETRY_IMMEDIATELY:
                    break;
                case ResponseCode.PULL_OFFSET_MOVED:
                   	//.......
                    break;
                default:
                    assert false;
            }
        } else {
          	//.......
        }

        	//.......
        return response;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

然后PullRequestHoldService就会定时检查是否有对应的新消息到来,然后遍历所有挂起的请求,将新消息下发,反之若挂起的请求长时间没收到消息则直接超时结束:

 @Override
    public void run() {
        log.info("{} service started", this.getServiceName());
        while (!this.isStopped()) {
            try {
                //等待
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    this.waitForRunning(5 * 1000);
                } else {
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }

                long beginLockTimestamp = this.systemClock.now();
                //基于超时时限内定时查看topic中的队列是否有新消息,如果有或者超时则返回
                this.checkHoldRequest();
                long costTime = this.systemClock.now() - beginLockTimestamp;
                if (costTime > 5 * 1000) {
                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
                }
            } catch (Throwable e) {
                //......
            }
        }

       //......
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

这里我们也给出checkHoldRequest的调用可以看到,如果查到队列offset大于用户传的说明就有新消息则返回,超时则直接返回:

 public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
        //......
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (mpr != null) {
            List<PullRequest> requestList = mpr.cloneListAndClear();
            if (requestList != null) {
              //......

                for (PullRequest request : requestList) {
                    long newestOffset = maxOffset;
                    if (newestOffset <= request.getPullFromThisOffset()) {
                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    }
                    //拉取到新消息就返回
                    if (newestOffset > request.getPullFromThisOffset()) {
                        //......

                        if (match) {
                            try {
                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                    request.getRequestCommand());
                            } catch (Throwable e) {
                                log.error("execute request when wakeup failed.", e);
                            }
                            continue;
                        }
                    }
                    //超时也返回
                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                        try {
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }

                    replayList.add(request);
                }

                //......
            }
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

# 为什么Consumer要集群部署?可以部署多少个?

在消息堆积的情况下,如果明确是因为topic过多导致消费者未能及时将这些消息分散消费的情况下,可以考虑增加consumer提升消费能力,部署的个数可以结合对应消息topic对应的队列数量决定,尽可能保证一比一的动态配置:

唯一需要注意的就是consumer的数量尽量不要超过topic,一旦超过就可能存在某些consumer消费不到消息的问题。

# 生产环境用的是同步复制还是异步复制?同步刷盘还是异步刷盘?

考虑用同步复制保证消息可靠性,通过异步刷盘减小一次消息投递请求的耗时进,保证尽早将消息投递结果告知生产者同时保证消息可靠性。

# 如何增加Topic的队列数?增加队列数会有什么影响?

默认是4,我们可以通过修改broker.conf修改队列数:

# 启用自动创建 Topic(默认已开启,但需配置默认队列数)
autoCreateTopicEnable=true
# 设置自动创建 Topic 的默认队列数(例如 16)
defaultTopicQueueNums=16
1
2
3
4

也可以通过命令行进行修改:

# 语法
sh mqadmin updateTopic -n <NameServer地址> -t <Topic名称> -w <新队列数>

# 示例:将 Topic 队列数改为 16
sh mqadmin updateTopic -n 127.0.0.1:9876 -t your-topic -w 16
1
2
3
4
5
  1. 临时增加topic队列数可能会导致消息分散进而导致消息顺序性被破坏。
  2. 增加过多队列,会增加broker维护各个队列存储和索引的负载。
  3. 增加topic队列后消息会引发消费者重平衡,会导致短暂的消费延迟。

# 小结

我是 sharkchili ,CSDN Java 领域博客专家,mini-redis的作者,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。

同时也非常欢迎你star我的开源项目mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

# 参考

面渣逆袭(RocketMQ面试题八股文)必看:https://tobebetterjavaer.com/sidebar/sanfene/rocketmq.html (opens new window)

SpringBoot消息使用RocketMQ Tag:https://www.jianshu.com/p/0634b0bfa94e (opens new window)

RocketMQ 消息负载均衡策略解析——图解、源码级解析:https://juejin.cn/post/7084825497393168414 (opens new window)

Rocketmq源码分析12:consumer 负载均衡:https://juejin.cn/post/6956391196981723167 (opens new window)

源码分析RocketMQ消息PULL-长轮询模式:https://blog.csdn.net/prestigeding/article/details/79357818 (opens new window)

从RocketMQ看长轮询(Long Polling):https://juejin.cn/post/6844903791653814279#heading-7 (opens new window)

图解 RocketMQ 架构:https://blog.csdn.net/weixin_45304503/article/details/140248110 (opens new window)

RocketMQ延迟消息的代码实战及原理分析:https://zhuanlan.zhihu.com/p/157444529 (opens new window)

RocketMQ的顺序消息(顺序消费):https://blog.csdn.net/weixin_43767015/article/details/121028059 (opens new window)

RocketMQ源码(22)—ConsumeMessageOrderlyService顺序消费消息源码:https://blog.csdn.net/weixin_43767015/article/details/129103030 (opens new window)

四选一?这 4 种高可用 RocketMQ 集群搭建方案,我推荐最后一种! :https://zhuanlan.zhihu.com/p/322426257 (opens new window)

消息队列漫谈:什么是消息模型?:https://zhuanlan.zhihu.com/p/99791229 (opens new window)

RocketMQ二面通关秘籍:面试官问得太细,这些问题你准备好了吗? :https://mp.weixin.qq.com/s/DZf82QjtbaJiYqJYAtmFeQ (opens new window)

编辑 (opens new window)
上次更新: 2026/03/26, 01:05:31
RocketMQ容器化最佳实践
Linux下的nginx安装

← RocketMQ容器化最佳实践 Linux下的nginx安装→

最近更新
01
基于EasyExcel实现高效导出
03-25
02
从开源框架中学习那些实用的位运算技巧
03-25
03
浅谈分布式架构设计思想和常见优化手段
03-25
更多文章>
Theme by Vdoing | Copyright © 2025-2026 Evan Xu | MIT License | 桂ICP备2024034950号 | 桂公网安备45142202000030
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
×
×