禅与计算机 禅与计算机
首页
  • Java基础

    • 聊一聊java一些核心知识点
    • 聊聊java面向对象核心知识点
    • 聊聊Java中的异常
    • 聊聊Java中的常用类String
    • 万字长文带你细聊Java注解本质
    • 来聊聊Java的反射机制
    • 深入解析Java泛型的魅力与机制
    • Java集合框架深度解析与面试指南
    • Java常用集合类HashMap深度解析
    • LinkedHashMap源码到面试题的全解析
    • 深入解析CopyOnWriteArrayList的工作机制
    • Java基础IO总结
    • Java三大IO模型小结
    • Java BIO NIO AIO详解
    • Java进阶NIO之IO多路复用详解
    • Java8流式编程入门
    • 一文速通lambda与函数式编程
    • Java8函数式方法引用最佳实践
  • Java并发编程

    • Java并发编程基础小结
    • 深入理解Java中的final关键字
    • 浅谈Java并发安全发布技术
    • 浅谈Java并发编程中断的哲学
    • Java线程池知识点小结
    • 浅谈Java线程池中拒绝策略与流控的艺术
    • synchronized关键字使用指南
    • 深入源码解析synchronized关键字
    • 详解JUC包下的锁
    • 详解并发编程中的CAS原子类
    • LongAdder源码分析
    • AQS源码解析
    • 深入剖析Java并发编程中的死锁问题
    • Java并发容器总结
    • 详解Java并发编程volatile关键字
    • 并发编程ThreadLocal必知必会
    • CompletableFuture基础实践小结
    • CompletableFuture异步多任务最佳实践
    • 硬核详解FutureTask设计与实现
    • 线程池大小设置的底层逻辑与场景化方案
    • 来聊一个有趣的限流器RateLimiter
  • JVM相关

    • 从零开始掌握 JVM
    • JVM核心知识点小结
    • JVM指令集概览:基础与应用
    • JVM类加载器深度解析
    • JVM方法区深度解析
    • Java内存模型JMM详解
    • Java对象大小的精确计算方法
    • 逃逸分析在Java中的应用与优化
    • 从零开始理解JVM的JIT编译机制
    • G1垃圾回收器:原理详解与调优指南
    • JVM故障排查实战指南
    • JVM内存问题排错最佳实践
    • JVM内存溢出排查指南
    • 简明的Arthas使用教程
    • 简明的Arthas配置及基础运维教程
    • 基于Arthas Idea的JVM故障排查与指令生成
    • 基于arthas量化监控诊断java应用方法论与实践
    • 深入剖析arthas技术原理
  • 深入理解Spring框架

    • Spring 核心知识点全面解析
    • Spring核心功能IOC详解
    • Spring AOP 深度剖析与实践
    • Spring 三级缓存机制深度解析
    • 深入 Spring 源码,剖析设计模式的落地实践
    • 探索 Spring 事务的奥秘
    • 深入解析Spring Bean的生命周期管理
    • 解读 Spring Boot 核心知识点
    • Spring Boot 启动优化实战:1分钟到13秒的排查与优化之路
    • Spring Boot自动装配原理及实践
    • 一文快速上手Sharding-JDBC
    • sharding-jdbc如何实现分页查询
    • 基于DynamicDataSource整合分库分表框架Shardingsphere
  • 计算机组成原理

    • 计算机硬件知识小结
    • CPU核心知识点小结
    • 浅谈CPU流水线的艺术
    • 从Java程序员视角聊聊CPU缓存
    • CPU任务调度和伪共享问题小结
    • CPU MESI缓存一致性协议
    • CPU内存管理机制
    • 内存深度解析
    • 磁盘存储原理
    • 详解计算机启动步骤
    • CPU南北桥架构与发展史
    • CPU中断机制与硬件交互详解
  • 操作系统

    • 如何实现一个高性能服务器
    • Linux文件结构与文件权限
    • Linux常见压缩指令小结
    • Linux核心系统调用详解
    • Linux进程管理
    • Linux线程管理
    • 进程与线程深度解析
    • Linux进程间通信机制
    • 零拷贝技术原理与实践
    • CPU缓存一致性问题深度解析
    • IO任务与CPU调度艺术
  • 计算机网络

    • 网卡通信原理详解
    • 网卡数据包处理指南
    • 基于抓包详解TCP协议
  • 编码最佳实践

    • 浅谈现代软件工程TDD最佳实践
    • 浅谈TDD模式下并发程序设计与实现
    • 面向AI编程新范式Trae后端开发环境搭建与实践
    • 基于提示词工程的Redis签到功能开发实践
    • 基于Vibe Coding的Redis分页查询实现
    • 告别AI无效对话:资深工程师的提示词设计最佳实践
  • 实用技巧与配置

    • Mac常用快捷键与效率插件指南
    • Keynote技术科普短视频制作全攻略
  • 写作

    • 写好技术博客的5大核心原则:从认知科学到AI工具的全流程指南
  • 开发工具

    • IDEA配置详解与高效使用指南
  • Nodejs
  • 博客搭建
  • Redis

    • Redis核心知识小结
    • 解锁Redis发布订阅模式
    • 掌握Redis事务
    • Redis主从复制技术
    • Redis的哨兵模式详解
    • 深度剖析Redisson分布式锁
    • 详解redis单线程设计思路
    • 来聊聊Redis所实现的Reactor模型
    • Redis RDB持久化源码深度解析
    • 来聊聊redis的AOF写入
    • 来聊聊Redis持久化AOF管道通信的设计
    • 来聊聊redis集群数据迁移
    • Redis SDS动态字符串深度解析
    • 高效索引的秘密:redis跳表设计与实现
    • 聊聊redis中的字典设计与实现
  • MySQL

    • MySQL基础知识点小结
    • 解读MySQL 索引基础
    • MySQL 索引进阶指南
    • 解读MySQL Explain关键字
    • 探秘 MySQL 锁:原理与实践
    • 详解MySQL重做日志redolog
    • 详解undoLog在MySQL MVCC中的运用
    • MySQL二进制日志binlog核心知识点
    • MySQL高效插入数据的最佳实践
    • MySQL分页查询优化指南
    • MySQL流式查询的奥秘与应用解析
    • 来聊聊分库分表
    • 来聊聊大厂常用的分布式ID生成方案
  • ElasticSearch

    • 从Lucene到Elasticsearch:进化之路
    • ES 基础使用指南
    • ElasticSearch如何写入一篇文档
    • 深入剖析Elasticsearch文档读取原理
    • 聊聊ElasticSearch性能调优
    • Spring借助Easy-Es操作ES
  • Netty

    • 一文快速了解高性能网络通信框架Netty
    • Netty网络传输简记
    • 来聊聊Netty的ByteBuf
    • 来聊聊Netty消息发送的那些事
    • 解密Netty高性能之谜:NioEventLoop线程池阻塞分析
    • 详解Netty中的责任链Pipeline如何管理ChannelHandler
    • Netty Reactor模型常见知识点小结
    • Netty如何驾驭TCP流式传输?粘包拆包问题全解
    • Netty解码器源码解析
  • 消息队列

    • 一文快速入门消息队列
    • 消息队列RocketMQ入门指南
    • 基于RocketMQ实现分布式事务
    • RocketMQ容器化最佳实践
    • RocketMQ常见问题与深度解析
    • Kafka快速安装与使用指南
  • Nginx

    • Linux下的nginx安装
    • Nginx基础入门总结
    • Nginx核心指令小结
    • Nginx进程结构与核心模块初探
    • Nginx应用进阶HTTP核心模块配置
    • Nginx缓存及HTTPS配置小记
    • nginx高可用实践简记
    • Nginx性能优化
  • 微服务基础

    • 微服务基础知识小结
    • 分布式事务核心概念小结
    • OpenFeign核心知识小结
    • 微服务组件Gateway核心使用小结
    • 分布式事务Seata实践
    • 用 Docker Compose 完成 Seata 的整合部署
  • Nacos

    • Nacos服务注册原理全解析
    • Nacos服务订阅流程全解析
    • Nacos服务变更推送流程全解析
    • 深入解析SpringCloud负载均衡器Loadbalancer
    • Nacos源码环境搭建与调试指南
  • Seata

    • 深度剖析Seata源码
  • Docker部署

    • 一文快速掌握docker的理念和基本使用
    • 使用docker编排容器
    • 基于docker-compose部署微服务基本环境
    • 基于docker容器化部署微服务
    • Gateway全局异常处理及请求响应监控
    • Docker图形化界面工具Portainer最佳实践
  • Go基础

    • 一文带你速通Go语言基础语法
    • 一文快速掌握Go语言切片
    • 来聊聊go语言的hashMap
    • 一文速通go语言类型系统
    • 浅谈Go语言中的面向对象
    • go语言是如何实现协程的
    • 聊聊go语言中的GMP模型
    • 极简的go语言channel入门
    • 聊聊go语言基于epoll的网络并发实现
    • 写给Java开发的Go语言协程实践
  • mini-redis实战

    • 来聊聊我用go手写redis这件事
    • mini-redis如何解析处理客户端请求
    • 实现mini-redis字符串操作
    • 硬核复刻redis底层双向链表核心实现
    • 动手复刻redis之go语言下的字典的设计与落地
    • Go 语言下的 Redis 跳表设计与实现
    • Go 语言版 Redis 有序集合指令复刻探索
  • 项目编排

    • Spring脚手架创建简记
    • Spring脚手架集成分页插件
    • Spring脚手架集成校验框架
    • maven父子模块两种搭建方式简记
    • SpringBoot+Vue3前后端快速整合入门
    • 来聊聊Java项目分层规范
  • 场景设计

    • Java实现文件分片上传
    • 基于时间缓存优化浏览器轮询阻塞问题
    • 基于EasyExcel实现高效导出
    • 10亿数据高效插入MySQL最佳方案
    • 从开源框架中学习那些实用的位运算技巧
  • CI/CD

    • 基于NETAPP实现内网穿透
    • 基于Gitee实现Jenkins自动化部署SpringBoot项目
    • Jenkins离线安装部署教程简记
    • 基于Nexus搭建Maven私服基础入门
    • 基于内网的Jenkins整合gitlab综合方案简记
  • 监控方法论

    • SpringBoot集成Prometheus与Grafana监控
    • Java监控度量Micrometer全解析
    • 从 micrometer计量器角度快速上手promQL
    • 硬核安利一个监控告警开源项目Nightingale
  • Spring AI

    • Spring AI Alibaba深度实战:一文掌握智能体开发全流程
    • Spring AI Alibaba实战:JVM监控诊断Arthas Agent的工程化构建与最佳实践
  • 大模型评测

    • M2.7 真能打!我用两个真实场景测了测,结果有点意外
    • Qoder JetBrains插件评测:祖传代码重构与接口优化实战
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

sharkchili

计算机禅修者
首页
  • Java基础

    • 聊一聊java一些核心知识点
    • 聊聊java面向对象核心知识点
    • 聊聊Java中的异常
    • 聊聊Java中的常用类String
    • 万字长文带你细聊Java注解本质
    • 来聊聊Java的反射机制
    • 深入解析Java泛型的魅力与机制
    • Java集合框架深度解析与面试指南
    • Java常用集合类HashMap深度解析
    • LinkedHashMap源码到面试题的全解析
    • 深入解析CopyOnWriteArrayList的工作机制
    • Java基础IO总结
    • Java三大IO模型小结
    • Java BIO NIO AIO详解
    • Java进阶NIO之IO多路复用详解
    • Java8流式编程入门
    • 一文速通lambda与函数式编程
    • Java8函数式方法引用最佳实践
  • Java并发编程

    • Java并发编程基础小结
    • 深入理解Java中的final关键字
    • 浅谈Java并发安全发布技术
    • 浅谈Java并发编程中断的哲学
    • Java线程池知识点小结
    • 浅谈Java线程池中拒绝策略与流控的艺术
    • synchronized关键字使用指南
    • 深入源码解析synchronized关键字
    • 详解JUC包下的锁
    • 详解并发编程中的CAS原子类
    • LongAdder源码分析
    • AQS源码解析
    • 深入剖析Java并发编程中的死锁问题
    • Java并发容器总结
    • 详解Java并发编程volatile关键字
    • 并发编程ThreadLocal必知必会
    • CompletableFuture基础实践小结
    • CompletableFuture异步多任务最佳实践
    • 硬核详解FutureTask设计与实现
    • 线程池大小设置的底层逻辑与场景化方案
    • 来聊一个有趣的限流器RateLimiter
  • JVM相关

    • 从零开始掌握 JVM
    • JVM核心知识点小结
    • JVM指令集概览:基础与应用
    • JVM类加载器深度解析
    • JVM方法区深度解析
    • Java内存模型JMM详解
    • Java对象大小的精确计算方法
    • 逃逸分析在Java中的应用与优化
    • 从零开始理解JVM的JIT编译机制
    • G1垃圾回收器:原理详解与调优指南
    • JVM故障排查实战指南
    • JVM内存问题排错最佳实践
    • JVM内存溢出排查指南
    • 简明的Arthas使用教程
    • 简明的Arthas配置及基础运维教程
    • 基于Arthas Idea的JVM故障排查与指令生成
    • 基于arthas量化监控诊断java应用方法论与实践
    • 深入剖析arthas技术原理
  • 深入理解Spring框架

    • Spring 核心知识点全面解析
    • Spring核心功能IOC详解
    • Spring AOP 深度剖析与实践
    • Spring 三级缓存机制深度解析
    • 深入 Spring 源码,剖析设计模式的落地实践
    • 探索 Spring 事务的奥秘
    • 深入解析Spring Bean的生命周期管理
    • 解读 Spring Boot 核心知识点
    • Spring Boot 启动优化实战:1分钟到13秒的排查与优化之路
    • Spring Boot自动装配原理及实践
    • 一文快速上手Sharding-JDBC
    • sharding-jdbc如何实现分页查询
    • 基于DynamicDataSource整合分库分表框架Shardingsphere
  • 计算机组成原理

    • 计算机硬件知识小结
    • CPU核心知识点小结
    • 浅谈CPU流水线的艺术
    • 从Java程序员视角聊聊CPU缓存
    • CPU任务调度和伪共享问题小结
    • CPU MESI缓存一致性协议
    • CPU内存管理机制
    • 内存深度解析
    • 磁盘存储原理
    • 详解计算机启动步骤
    • CPU南北桥架构与发展史
    • CPU中断机制与硬件交互详解
  • 操作系统

    • 如何实现一个高性能服务器
    • Linux文件结构与文件权限
    • Linux常见压缩指令小结
    • Linux核心系统调用详解
    • Linux进程管理
    • Linux线程管理
    • 进程与线程深度解析
    • Linux进程间通信机制
    • 零拷贝技术原理与实践
    • CPU缓存一致性问题深度解析
    • IO任务与CPU调度艺术
  • 计算机网络

    • 网卡通信原理详解
    • 网卡数据包处理指南
    • 基于抓包详解TCP协议
  • 编码最佳实践

    • 浅谈现代软件工程TDD最佳实践
    • 浅谈TDD模式下并发程序设计与实现
    • 面向AI编程新范式Trae后端开发环境搭建与实践
    • 基于提示词工程的Redis签到功能开发实践
    • 基于Vibe Coding的Redis分页查询实现
    • 告别AI无效对话:资深工程师的提示词设计最佳实践
  • 实用技巧与配置

    • Mac常用快捷键与效率插件指南
    • Keynote技术科普短视频制作全攻略
  • 写作

    • 写好技术博客的5大核心原则:从认知科学到AI工具的全流程指南
  • 开发工具

    • IDEA配置详解与高效使用指南
  • Nodejs
  • 博客搭建
  • Redis

    • Redis核心知识小结
    • 解锁Redis发布订阅模式
    • 掌握Redis事务
    • Redis主从复制技术
    • Redis的哨兵模式详解
    • 深度剖析Redisson分布式锁
    • 详解redis单线程设计思路
    • 来聊聊Redis所实现的Reactor模型
    • Redis RDB持久化源码深度解析
    • 来聊聊redis的AOF写入
    • 来聊聊Redis持久化AOF管道通信的设计
    • 来聊聊redis集群数据迁移
    • Redis SDS动态字符串深度解析
    • 高效索引的秘密:redis跳表设计与实现
    • 聊聊redis中的字典设计与实现
  • MySQL

    • MySQL基础知识点小结
    • 解读MySQL 索引基础
    • MySQL 索引进阶指南
    • 解读MySQL Explain关键字
    • 探秘 MySQL 锁:原理与实践
    • 详解MySQL重做日志redolog
    • 详解undoLog在MySQL MVCC中的运用
    • MySQL二进制日志binlog核心知识点
    • MySQL高效插入数据的最佳实践
    • MySQL分页查询优化指南
    • MySQL流式查询的奥秘与应用解析
    • 来聊聊分库分表
    • 来聊聊大厂常用的分布式ID生成方案
  • ElasticSearch

    • 从Lucene到Elasticsearch:进化之路
    • ES 基础使用指南
    • ElasticSearch如何写入一篇文档
    • 深入剖析Elasticsearch文档读取原理
    • 聊聊ElasticSearch性能调优
    • Spring借助Easy-Es操作ES
  • Netty

    • 一文快速了解高性能网络通信框架Netty
    • Netty网络传输简记
    • 来聊聊Netty的ByteBuf
    • 来聊聊Netty消息发送的那些事
    • 解密Netty高性能之谜:NioEventLoop线程池阻塞分析
    • 详解Netty中的责任链Pipeline如何管理ChannelHandler
    • Netty Reactor模型常见知识点小结
    • Netty如何驾驭TCP流式传输?粘包拆包问题全解
    • Netty解码器源码解析
  • 消息队列

    • 一文快速入门消息队列
    • 消息队列RocketMQ入门指南
    • 基于RocketMQ实现分布式事务
    • RocketMQ容器化最佳实践
    • RocketMQ常见问题与深度解析
    • Kafka快速安装与使用指南
  • Nginx

    • Linux下的nginx安装
    • Nginx基础入门总结
    • Nginx核心指令小结
    • Nginx进程结构与核心模块初探
    • Nginx应用进阶HTTP核心模块配置
    • Nginx缓存及HTTPS配置小记
    • nginx高可用实践简记
    • Nginx性能优化
  • 微服务基础

    • 微服务基础知识小结
    • 分布式事务核心概念小结
    • OpenFeign核心知识小结
    • 微服务组件Gateway核心使用小结
    • 分布式事务Seata实践
    • 用 Docker Compose 完成 Seata 的整合部署
  • Nacos

    • Nacos服务注册原理全解析
    • Nacos服务订阅流程全解析
    • Nacos服务变更推送流程全解析
    • 深入解析SpringCloud负载均衡器Loadbalancer
    • Nacos源码环境搭建与调试指南
  • Seata

    • 深度剖析Seata源码
  • Docker部署

    • 一文快速掌握docker的理念和基本使用
    • 使用docker编排容器
    • 基于docker-compose部署微服务基本环境
    • 基于docker容器化部署微服务
    • Gateway全局异常处理及请求响应监控
    • Docker图形化界面工具Portainer最佳实践
  • Go基础

    • 一文带你速通Go语言基础语法
    • 一文快速掌握Go语言切片
    • 来聊聊go语言的hashMap
    • 一文速通go语言类型系统
    • 浅谈Go语言中的面向对象
    • go语言是如何实现协程的
    • 聊聊go语言中的GMP模型
    • 极简的go语言channel入门
    • 聊聊go语言基于epoll的网络并发实现
    • 写给Java开发的Go语言协程实践
  • mini-redis实战

    • 来聊聊我用go手写redis这件事
    • mini-redis如何解析处理客户端请求
    • 实现mini-redis字符串操作
    • 硬核复刻redis底层双向链表核心实现
    • 动手复刻redis之go语言下的字典的设计与落地
    • Go 语言下的 Redis 跳表设计与实现
    • Go 语言版 Redis 有序集合指令复刻探索
  • 项目编排

    • Spring脚手架创建简记
    • Spring脚手架集成分页插件
    • Spring脚手架集成校验框架
    • maven父子模块两种搭建方式简记
    • SpringBoot+Vue3前后端快速整合入门
    • 来聊聊Java项目分层规范
  • 场景设计

    • Java实现文件分片上传
    • 基于时间缓存优化浏览器轮询阻塞问题
    • 基于EasyExcel实现高效导出
    • 10亿数据高效插入MySQL最佳方案
    • 从开源框架中学习那些实用的位运算技巧
  • CI/CD

    • 基于NETAPP实现内网穿透
    • 基于Gitee实现Jenkins自动化部署SpringBoot项目
    • Jenkins离线安装部署教程简记
    • 基于Nexus搭建Maven私服基础入门
    • 基于内网的Jenkins整合gitlab综合方案简记
  • 监控方法论

    • SpringBoot集成Prometheus与Grafana监控
    • Java监控度量Micrometer全解析
    • 从 micrometer计量器角度快速上手promQL
    • 硬核安利一个监控告警开源项目Nightingale
  • Spring AI

    • Spring AI Alibaba深度实战:一文掌握智能体开发全流程
    • Spring AI Alibaba实战:JVM监控诊断Arthas Agent的工程化构建与最佳实践
  • 大模型评测

    • M2.7 真能打!我用两个真实场景测了测,结果有点意外
    • Qoder JetBrains插件评测:祖传代码重构与接口优化实战
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Netty

    • 一文快速了解高性能网络通信框架Netty
    • Netty网络传输简记
    • 来聊聊Netty的ByteBuf
    • 来聊聊Netty消息发送的那些事
      • 前言
      • 一个消息积压导致OOM的例子
      • 原因分析
      • 基于调试剖析问题原因
      • 解决方式
      • 详解高低水位工作原理
      • 小结
      • 更进一步——详解ChannelOutboundBuffer
      • 参考文献
    • 基于Netty连接池泄露问题了解客户端启动源码
    • 来聊聊Netty使用不当导致的并发波动问题
    • 关于使用Netty业务处理器ChannelHanlder的一些注意事项
    • 解密Netty高性能之谜:NioEventLoop线程池阻塞分析与调优策略
    • Linux下Netty实现高性能UDP服务
    • netty源码编译跑通简记
    • 基于Netty服务端快速了解核心组件
    • 用Netty快速落地一个客户端程序
    • 详解Netty中的责任链Pipeline如何管理ChannelHandler
    • 来聊聊Netty几个开箱即用的处理器框架
    • 聊聊Netty中几个重要的生命周期
    • Netty的几种IO模式的实现与切换
    • 聊聊Netty异常传播链与最佳实践
    • 从Netty的ByteBuf中学习高并发场景下的内存优化艺术
    • 聊聊Netty客户端断线重连的设计与实现
    • 基于Netty源码学习那些并发技巧
    • Netty连接可靠性Idle监测连环问
    • Netty如何驾驭TCP流式传输?粘包拆包问题全解与编解码器最佳实践
    • Netty解码器源码解析
    • Netty Reactor模型常见知识点小结
  • 消息队列

  • Nginx

  • 中间件
  • Netty
sharkchili
2023-06-01
目录

来聊聊Netty消息发送的那些事

# 前言

我们在上一篇文章来聊聊Netty的ByteBuf (opens new window)了解到Netty的传输API的使用,这一片我们来聊聊Netty消息发送时的一些注意事项。

# 一个消息积压导致OOM的例子

我们现在有这么一个案例,客户端是基于Netty框架实现网络通信的,多个客户端向服务端发送消息时出现OOM问题,对此我们自己编写了一段测试模拟高并发的客户端消息发送尝试重现这个问题。

先来看看服务端的代码,如下所示,一套标准的模板。

public class Server5 {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                //.handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        //绑定自定义业务逻辑处理器
                        p.addLast(new ServerHandler());
                    }
                });
        //绑定9999端口
        ChannelFuture f = b.bind(9999).sync();
        //监听关闭
        f.channel().closeFuture().addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

而业务逻辑处理器代码也很简单,直接将客户端发来的东西原原本本传回去即可

@Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {
	static ExecutorService executorService = Executors.newSingleThreadExecutor();
	PooledByteBufAllocator allocator = new PooledByteBufAllocator(false);

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		//将客户端发送的消息原原本本发回去
		ByteBuf byteBuf = (ByteBuf) msg;
		System.out.println(">>>>>>>>>服务端,byteBuf=" + byteBuf);
		ctx.write(msg);
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) {
		ctx.flush();
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

了解的服务端源码之后,我们不妨再来看看客户端的代码,启动代码很简单,先睡10s给我们连接JVM监控工具,然后连接9999端口。

public class Client5 {

    @SuppressWarnings({"unchecked", "deprecation"})
    public static void main(String[] args) throws Exception {
        //休眠10s让 jvisualvm可以连上
        TimeUnit.SECONDS.sleep(10);


        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        //自定义业务逻辑处理器
                        p.addLast(new ClientHandler());

                    }
                });
        //连接9999端口
        ChannelFuture f = b.connect("127.0.0.1", 9999).sync();
        //监听关闭和设置监听
        f.channel().closeFuture().sync();
        f.channel().closeFuture().addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                group.shutdownGracefully();
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

业务逻辑处理器的代码也很简单,创建一个线程无限发送消息以模拟高并发场景。

public class ClientHandler extends ChannelInboundHandlerAdapter {

    Runnable loadRunner;

    @Override
    public void channelActive(final ChannelHandlerContext ctx) {
        loadRunner = new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                ByteBuf msg = null;
                final int len = "压测NettyOOM异常".getBytes().length;
                //无限循环模拟并发提交数据
                while (true) {
                    msg = Unpooled.wrappedBuffer("压测NettyOOM异常".getBytes());
                    ctx.writeAndFlush(msg);
                }
            }
        };
        new Thread(loadRunner, "LoadRunner-Thread").start();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //用完就释放掉
        ReferenceCountUtil.release(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

完成上述编码工作之后,我们先启动服务端,然后客户端添加下面这段参数调整堆内存然后再启动

-Xmn256m -Xmx256m
1

我们打开jvisualvm之后发现,老年代内存居高不下,而且频繁的发生GC。

在这里插入图片描述

最终我们的控制台出现了OOM异常

在这里插入图片描述

# 原因分析

为了剖析问题的原因,我们使用jmap导出内存使用情况进行问题排查。首先我们定位到client进程号为18096

C:\Users\xxxxx>jps
18096 Client5
10900 Launcher
13636
14212 Jps
15828 Launcher
16692 Server5
17080 Main
14668 Launcher
8844
1
2
3
4
5
6
7
8
9
10

然后使用以下命令将内存使用情况导出

jmap -dump:format=b,file=e:/oom.hprof 18096
1

使用mat打开之后可以发现大量内存被占用,而问题出现在NioEventLoop,我们不妨点看detail查看究竟。

在这里插入图片描述

可以看到长长的一坨对象内存信息,我们不妨点开一看详情。

在这里插入图片描述

很明显问题出在WriteAndFlushTask,此时我们大概率推测是我们的客户端调用writeAndFlush写的有问题。

在这里插入图片描述

同时我们查看内存使用详情也发现这些堆积的正式我们发送的消息,可以看到内容都是压测NettyOOM异常。

在这里插入图片描述

如下图可以发现,最占内存的就是我们的byte消息。

在这里插入图片描述

最后我们再看看,这些内存占用的关系。

在这里插入图片描述

可以看到,NioEventLoop中的队列堆积着大量的lambda任务,很明显,我们发送的消息被积压了,所以我们不妨调试一下看看writeAndFlush方法看看为什么积压。

在这里插入图片描述

# 基于调试剖析问题原因

为了排查问题,我们打好断点并将循环去掉

在这里插入图片描述

首先来到writeAndFlush内部,我们直接进入write查看写的时候做了些什么。

@Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }

        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return promise;
        }

        write(msg, true, promise);

        return promise;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

经过调试我们发现代码在执行executor.inEventLoop得知当前线程不是EventLoop线程所以走了else逻辑,所以我们不妨看看 WriteAndFlushTask.newInstance做了什么。

 private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        //判断当前执行任务的线程是否在executor中
        if (executor.inEventLoop()) {
        //如果在则直接发送
          ....
        } else {
            AbstractWriteTask task;
            if (flush) {
            //代码走到了这里
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

步入代码一看,整体逻辑就是封装一个task并返回,没有什么特殊的地方,所以我们不妨看看上面的safeExecute方法具体怎么执行的。

 private static WriteAndFlushTask newInstance(
                AbstractChannelHandlerContext ctx, Object msg,  ChannelPromise promise) {
            WriteAndFlushTask task = RECYCLER.get();
            init(task, ctx, msg, promise);
            return task;
        }
1
2
3
4
5
6

可以看到safeExecute会调用一个 executor.execute(runnable);提交当前任务,我们不妨看看内部做了什么。

private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
        try {
            executor.execute(runnable);
        } catch (Throwable cause) {
          ......
        }
    }
1
2
3
4
5
6
7

经过笔者调试终于发现了问题所在,原来任务提交后如果不是NioEventLoop的任务会直接提交到队列中等待轮询,由于当前系统消息非常多,导致当前任务没有及时被处理,由此出现消息积压,进而出现OOM问题。

在这里插入图片描述

# 解决方式

从上文我们不难看出问题原因出现在客户端发送消息使用的线程并不是NioEventLoop线程,导致消息积压到MpscUnboundedArrayQueue中。

在这里插入图片描述

这一点我们在导出的内存图中也可以印证这一点。

在这里插入图片描述

由此我们得出两种解决方式:

  1. 如果问题出现在服务端,可以对服务端可以进行流控。
  2. 对于客户端积压,可以采取高低水位并发保护机制。

对于本次问题我们就必须采用方案2了,设置高水位,确保消息积压达到高水压时直接将channel状态设置为不可写。

所以我们在客户端的选项中添加高水位为1G,并将业务逻辑处理器改为WaterClientHandler

public class Client5 {

    @SuppressWarnings({"unchecked", "deprecation"})
    public static void main(String[] args) throws Exception {
        //休眠10s让 jvisualvm可以连上
        TimeUnit.SECONDS.sleep(10);


        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
				.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 1024 * 1024)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        //自定义业务逻辑处理器
                        p.addLast(new WaterClientHandler());

                    }
                });
        //连接9999端口
        ChannelFuture f = b.connect("127.0.0.1", 9999).sync();
        //监听关闭和设置监听
        f.channel().closeFuture().sync();
        f.channel().closeFuture().addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                group.shutdownGracefully();
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

WaterClientHandler代码如下,我们直接看channelActive即可,设置高水位为10G,当消息堆积达到高水位值isWritable就会发现false进而我们的客户端走到不可写的逻辑中。

public class WaterClientHandler extends ChannelInboundHandlerAdapter {

	private final ByteBuf firstMessage;

	Runnable loadRunner;

	AtomicLong sendSum = new AtomicLong(0);

	Runnable profileMonitor;
	static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

	public WaterClientHandler() {
		firstMessage = Unpooled.buffer(SIZE);
		for (int i = 0; i < firstMessage.capacity(); i++) {
			firstMessage.writeByte((byte) i);
		}
	}

	@Override
	public void channelActive(final ChannelHandlerContext ctx) {
	//设置高水位为10G
		ctx.channel().config().setWriteBufferHighWaterMark(10 * 1024 * 1024);
		loadRunner = new Runnable() {
			@Override
			public void run() {
				try {
					TimeUnit.SECONDS.sleep(3);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				ByteBuf msg = null;
				final int len = "压测NettyOOM异常".getBytes().length;
				while (true) {
					//查看当前积压消息是否达到阈值判断当前channel是否可写
					if (ctx.channel().isWritable()) {
						msg = Unpooled.wrappedBuffer("压测NettyOOM异常".getBytes());
						ctx.writeAndFlush(msg);
					} else {
						System.out.println("The write queue is busy!");
					}
				}
			}
		};
		new Thread(loadRunner, "LoadRunner-Thread").start();
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		ReferenceCountUtil.release(msg);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

完成后,项目启动时,我们就发现消息堆积达到高水位时,直接响应客户端不可消息,不会执行消息处理发送的逻辑。

在这里插入图片描述

查看GC情况还是很频繁,但是不会再出现OOM,我们完全可以在业务上进行进一步的优化。

在这里插入图片描述

# 详解高低水位工作原理

我们不妨看看高低水位的工作机制,可以看到在连接建立方法channelActive中我们调用的设置高低水位的方法setWriteBufferHighWaterMark会通过cas的方式在客户端设置高水位的值。

在这里插入图片描述

后续我们写入时就可以调用isWritable方法,可以看到该方法会获取buf的大小,然后通过

 @Override
    public boolean isWritable() {
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        return buf != null && buf.isWritable();
    }
1
2
3
4
5

进入内部如果不可写的标志返回0,则说明当前是可写的。由此我们的客户端可以继续发送消息。

 public boolean isWritable() {
        return unwritable == 0;
    }
1
2
3

# 小结

总结一下netty消息发送的工作原理,如下图所示,可以看到客户端发送消息的处理流程中,如果为NioEventLoop的线程的任务会直接交予channelOutBoundBuffer进行write操作。反之非NioEventLoop线程的任务会直接堆到队列中,等待NioEventLoop执行。

正是如此,高并发场景下,大量消息堆到队列中无法及时处理才导致我们的OOM问题。

所以如果单个channel对应的EventLoop对于消息处理不过来时,我们不仅可以通过高低位来做到孔子,如果机器允许的情况下,我们建议调整EventLoop的线程数来提高服务器性能。

在这里插入图片描述

# 更进一步——详解ChannelOutboundBuffer

上文我们提到了ChannelOutboundBuffer发送消息,ChannelOutboundBuffer发送消息的工作方式也很简单:

  1. 我们调用write方法进行消息发送。
  2. 任务提交到NioEventLoop中。
  3. NioEventLoop轮询并提交任务。
  4. 任务走到ChannelOutboundBuffer将消息添加到自己的链表中。
  5. 判断当前缓冲区是否超过高水位,如果没超过则进行发送消息。
  6. 消息发送后释放内存。

了解整体步骤,我们直接通过debug了解一下关键流程,经过这么多篇文章的介绍,我们不妨直接在NioEventLoop轮询处理写任务的地方打个断点。

所以我们在AbstractChannelHandlerContext的run打个断点。

在这里插入图片描述

步入AbstractChannelHandlerContext的write可以看到就做了两件事,一个是调用write将消息添加到缓冲区,然后调用invokeFlush刷新并发送消息。


 @Override
        public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            super.write(ctx, msg, promise);
            ctx.invokeFlush();
        }

1
2
3
4
5
6
7

我们不断步进代码,最终走到invokeWrite0,可以看到它调用了ChannelOutboundHandler进行写入操作,我们不妨看看write干了些什么。

private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }
1
2
3
4
5
6
7

核心步骤如下所示,先将消息转为直接内存,然后添加到ChannelOutboundBuffer 末尾,我们不妨看看addMessage做了什么。

@Override
        public final void write(Object msg, ChannelPromise promise) {
           .....
 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            int size;
            try {
            //转为直接内存
                msg = filterOutboundMessage(msg);
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }
			//添加到ChannelOutboundBuffer 链表后面
            outboundBuffer.addMessage(msg, size, promise);
        }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

可以看到就是将消息封装成节点,然后追加到链表尾巴的操作,最后调用incrementPendingOutboundBytes,我们步入看看incrementPendingOutboundBytes做了什么。

public void addMessage(Object msg, int size, ChannelPromise promise) {
//封装为节点
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
        }
        //尾节点指向当前节点
        tailEntry = entry;
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

可以看到代码逻辑非常简单,查看写入的缓冲区是否会大于高水位,如果大于则将写标志设置为不可写。

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }

        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }
1
2
3
4
5
6
7
8
9
10

自此,write方法都结束,我们回到AbstractChannelHandlerContext看看flush做了什么。

如下可以看到,它会调用ChannelOutboundHandler调用flush,我们步入看看。

 private void invokeFlush0() {
        try {
            ((ChannelOutboundHandler) handler()).flush(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
1
2
3
4
5
6

逻辑如下,首先会调用addFlush取出未处理的节点,然后调用flush0进行发送。

@Override
        public final void flush() {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }
			//从链表中取出最前面的为完成的节点
            outboundBuffer.addFlush();
            //
            flush0();
        }
1
2
3
4
5
6
7
8
9
10
11
12
13

经过笔者整理,可以看到flush0核心逻辑就是调用doWrite处理消息。

 @SuppressWarnings("deprecation")
        protected void flush0() {
          ......

            try {
                doWrite(outboundBuffer);
            } catch (Throwable t) {
                ........
                }
            } finally {
                inFlush0 = false;
            }
        }
1
2
3
4
5
6
7
8
9
10
11
12
13

重点来了,doWrite会首先获取javaChannel,还有writeSpinCount 即写消息最大循环次数,默认为16.这意味如果是netty的消息,在后续的while循环中16次没有完成数据写入javaChannel,netty就会调用incompleteWrite到netty注册一个写事件,等待下一次继续执行写任务,避免本地写事件占用线程时间。

@Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
   		 //获取javaChannel
        SocketChannel ch = javaChannel();
        //获取netty消息最大写入循环次数
        int writeSpinCount = config().getWriteSpinCount();
      
......

          
            //判断消息是不是netty的消息
            switch (nioBufferCnt) {
                case 0:
                	// 如果是netty消息则走这个分支,调用doWrite0将数据写入javaChannle,writeSpinCount 默认为16,这意为着循环中超过16次没写完,则netty会继续注册一个写事件,等待下一次执行
                    writeSpinCount -= doWrite0(in);
                    break;
                case 1: {
                  //非netty的则走这段逻辑
                    ByteBuffer buffer = nioBuffers[0];
                    int attemptedBytes = buffer.remaining();
                    //往javaChannle写数据返回写入字节数
                    final int localWrittenBytes = ch.write(buffer);
                    //如果小于0则说明写入失败,则注册一个写事件等待NioEventLoop下次轮询到再执行
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                    adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                    //如果消息都写完,这里就会进行读写索引更新,如果都读取完并发送了,会直接释放内存
                    in.removeBytes(localWrittenBytes);
                    //写入次数自减
                    --writeSpinCount;
                    break;
                }
                default: {
                 .........
                }
            }
        } while (writeSpinCount > 0);

        incompleteWrite(writeSpinCount < 0);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

为保证文章的连贯,补充一下incompleteWrite,可以看到它内部会调用setOpWrite向NioSocketChannel注册写事件。

protected final void incompleteWrite(boolean setOpWrite) {
        //如果没写完,则注册写事件
        if (setOpWrite) {
            setOpWrite();
        } else {
      .....
        }
    }
1
2
3
4
5
6
7
8

注册逻辑如下,改方法用了与运算,判断当前是否注册了写事件,如果没有用或运算确保搞笑的将写事件注册到NioSocketChannel中。

 protected final void setOpWrite() {
        final SelectionKey key = selectionKey();
     
        final int interestOps = key.interestOps();
        if ((interestOps & SelectionKey.OP_WRITE) == 0) {
        //使用或操作,注册写事件
            key.interestOps(interestOps | SelectionKey.OP_WRITE);
        }
    }
1
2
3
4
5
6
7
8
9

自此全文结束。

# 参考文献

Java性能调优 6步实现项目性能升级 (opens new window)

编辑 (opens new window)
上次更新: 2026/03/26, 01:05:31
来聊聊Netty的ByteBuf
基于Netty连接池泄露问题了解客户端启动源码

← 来聊聊Netty的ByteBuf 基于Netty连接池泄露问题了解客户端启动源码→

最近更新
01
基于EasyExcel实现高效导出
03-25
02
从开源框架中学习那些实用的位运算技巧
03-25
03
浅谈分布式架构设计思想和常见优化手段
03-25
更多文章>
Theme by Vdoing | Copyright © 2025-2026 Evan Xu | MIT License | 桂ICP备2024034950号 | 桂公网安备45142202000030
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
×
×