来聊聊Netty消息发送的那些事
# 前言
我们在上一篇文章来聊聊Netty的ByteBuf (opens new window)了解到Netty的传输API的使用,这一片我们来聊聊Netty消息发送时的一些注意事项。
# 一个消息积压导致OOM的例子
我们现在有这么一个案例,客户端是基于Netty框架实现网络通信的,多个客户端向服务端发送消息时出现OOM问题,对此我们自己编写了一段测试模拟高并发的客户端消息发送尝试重现这个问题。
先来看看服务端的代码,如下所示,一套标准的模板。
public class Server5 {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
//.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//绑定自定义业务逻辑处理器
p.addLast(new ServerHandler());
}
});
//绑定9999端口
ChannelFuture f = b.bind(9999).sync();
//监听关闭
f.channel().closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
而业务逻辑处理器代码也很简单,直接将客户端发来的东西原原本本传回去即可
@Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {
static ExecutorService executorService = Executors.newSingleThreadExecutor();
PooledByteBufAllocator allocator = new PooledByteBufAllocator(false);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//将客户端发送的消息原原本本发回去
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(">>>>>>>>>服务端,byteBuf=" + byteBuf);
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
了解的服务端源码之后,我们不妨再来看看客户端的代码,启动代码很简单,先睡10s给我们连接JVM监控工具,然后连接9999端口。
public class Client5 {
@SuppressWarnings({"unchecked", "deprecation"})
public static void main(String[] args) throws Exception {
//休眠10s让 jvisualvm可以连上
TimeUnit.SECONDS.sleep(10);
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//自定义业务逻辑处理器
p.addLast(new ClientHandler());
}
});
//连接9999端口
ChannelFuture f = b.connect("127.0.0.1", 9999).sync();
//监听关闭和设置监听
f.channel().closeFuture().sync();
f.channel().closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
group.shutdownGracefully();
}
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
业务逻辑处理器的代码也很简单,创建一个线程无限发送消息以模拟高并发场景。
public class ClientHandler extends ChannelInboundHandlerAdapter {
Runnable loadRunner;
@Override
public void channelActive(final ChannelHandlerContext ctx) {
loadRunner = new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
ByteBuf msg = null;
final int len = "压测NettyOOM异常".getBytes().length;
//无限循环模拟并发提交数据
while (true) {
msg = Unpooled.wrappedBuffer("压测NettyOOM异常".getBytes());
ctx.writeAndFlush(msg);
}
}
};
new Thread(loadRunner, "LoadRunner-Thread").start();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//用完就释放掉
ReferenceCountUtil.release(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
完成上述编码工作之后,我们先启动服务端,然后客户端添加下面这段参数调整堆内存然后再启动
-Xmn256m -Xmx256m
我们打开jvisualvm之后发现,老年代内存居高不下,而且频繁的发生GC。

最终我们的控制台出现了OOM异常

# 原因分析
为了剖析问题的原因,我们使用jmap导出内存使用情况进行问题排查。首先我们定位到client进程号为18096
C:\Users\xxxxx>jps
18096 Client5
10900 Launcher
13636
14212 Jps
15828 Launcher
16692 Server5
17080 Main
14668 Launcher
8844
2
3
4
5
6
7
8
9
10
然后使用以下命令将内存使用情况导出
jmap -dump:format=b,file=e:/oom.hprof 18096
使用mat打开之后可以发现大量内存被占用,而问题出现在NioEventLoop,我们不妨点看detail查看究竟。

可以看到长长的一坨对象内存信息,我们不妨点开一看详情。

很明显问题出在WriteAndFlushTask,此时我们大概率推测是我们的客户端调用writeAndFlush写的有问题。

同时我们查看内存使用详情也发现这些堆积的正式我们发送的消息,可以看到内容都是压测NettyOOM异常。

如下图可以发现,最占内存的就是我们的byte消息。

最后我们再看看,这些内存占用的关系。

可以看到,NioEventLoop中的队列堆积着大量的lambda任务,很明显,我们发送的消息被积压了,所以我们不妨调试一下看看writeAndFlush方法看看为什么积压。

# 基于调试剖析问题原因
为了排查问题,我们打好断点并将循环去掉

首先来到writeAndFlush内部,我们直接进入write查看写的时候做了些什么。
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
write(msg, true, promise);
return promise;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
经过调试我们发现代码在执行executor.inEventLoop得知当前线程不是EventLoop线程所以走了else逻辑,所以我们不妨看看 WriteAndFlushTask.newInstance做了什么。
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
//判断当前执行任务的线程是否在executor中
if (executor.inEventLoop()) {
//如果在则直接发送
....
} else {
AbstractWriteTask task;
if (flush) {
//代码走到了这里
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
步入代码一看,整体逻辑就是封装一个task并返回,没有什么特殊的地方,所以我们不妨看看上面的safeExecute方法具体怎么执行的。
private static WriteAndFlushTask newInstance(
AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
WriteAndFlushTask task = RECYCLER.get();
init(task, ctx, msg, promise);
return task;
}
2
3
4
5
6
可以看到safeExecute会调用一个 executor.execute(runnable);提交当前任务,我们不妨看看内部做了什么。
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
try {
executor.execute(runnable);
} catch (Throwable cause) {
......
}
}
2
3
4
5
6
7
经过笔者调试终于发现了问题所在,原来任务提交后如果不是NioEventLoop的任务会直接提交到队列中等待轮询,由于当前系统消息非常多,导致当前任务没有及时被处理,由此出现消息积压,进而出现OOM问题。

# 解决方式
从上文我们不难看出问题原因出现在客户端发送消息使用的线程并不是NioEventLoop线程,导致消息积压到MpscUnboundedArrayQueue中。

这一点我们在导出的内存图中也可以印证这一点。

由此我们得出两种解决方式:
- 如果问题出现在服务端,可以对服务端可以进行流控。
- 对于客户端积压,可以采取高低水位并发保护机制。
对于本次问题我们就必须采用方案2了,设置高水位,确保消息积压达到高水压时直接将channel状态设置为不可写。
所以我们在客户端的选项中添加高水位为1G,并将业务逻辑处理器改为WaterClientHandler
public class Client5 {
@SuppressWarnings({"unchecked", "deprecation"})
public static void main(String[] args) throws Exception {
//休眠10s让 jvisualvm可以连上
TimeUnit.SECONDS.sleep(10);
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 1024 * 1024)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//自定义业务逻辑处理器
p.addLast(new WaterClientHandler());
}
});
//连接9999端口
ChannelFuture f = b.connect("127.0.0.1", 9999).sync();
//监听关闭和设置监听
f.channel().closeFuture().sync();
f.channel().closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
group.shutdownGracefully();
}
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
WaterClientHandler代码如下,我们直接看channelActive即可,设置高水位为10G,当消息堆积达到高水位值isWritable就会发现false进而我们的客户端走到不可写的逻辑中。
public class WaterClientHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf firstMessage;
Runnable loadRunner;
AtomicLong sendSum = new AtomicLong(0);
Runnable profileMonitor;
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public WaterClientHandler() {
firstMessage = Unpooled.buffer(SIZE);
for (int i = 0; i < firstMessage.capacity(); i++) {
firstMessage.writeByte((byte) i);
}
}
@Override
public void channelActive(final ChannelHandlerContext ctx) {
//设置高水位为10G
ctx.channel().config().setWriteBufferHighWaterMark(10 * 1024 * 1024);
loadRunner = new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
ByteBuf msg = null;
final int len = "压测NettyOOM异常".getBytes().length;
while (true) {
//查看当前积压消息是否达到阈值判断当前channel是否可写
if (ctx.channel().isWritable()) {
msg = Unpooled.wrappedBuffer("压测NettyOOM异常".getBytes());
ctx.writeAndFlush(msg);
} else {
System.out.println("The write queue is busy!");
}
}
}
};
new Thread(loadRunner, "LoadRunner-Thread").start();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
完成后,项目启动时,我们就发现消息堆积达到高水位时,直接响应客户端不可消息,不会执行消息处理发送的逻辑。

查看GC情况还是很频繁,但是不会再出现OOM,我们完全可以在业务上进行进一步的优化。

# 详解高低水位工作原理
我们不妨看看高低水位的工作机制,可以看到在连接建立方法channelActive中我们调用的设置高低水位的方法setWriteBufferHighWaterMark会通过cas的方式在客户端设置高水位的值。

后续我们写入时就可以调用isWritable方法,可以看到该方法会获取buf的大小,然后通过
@Override
public boolean isWritable() {
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
return buf != null && buf.isWritable();
}
2
3
4
5
进入内部如果不可写的标志返回0,则说明当前是可写的。由此我们的客户端可以继续发送消息。
public boolean isWritable() {
return unwritable == 0;
}
2
3
# 小结
总结一下netty消息发送的工作原理,如下图所示,可以看到客户端发送消息的处理流程中,如果为NioEventLoop的线程的任务会直接交予channelOutBoundBuffer进行write操作。反之非NioEventLoop线程的任务会直接堆到队列中,等待NioEventLoop执行。
正是如此,高并发场景下,大量消息堆到队列中无法及时处理才导致我们的OOM问题。
所以如果单个channel对应的EventLoop对于消息处理不过来时,我们不仅可以通过高低位来做到孔子,如果机器允许的情况下,我们建议调整EventLoop的线程数来提高服务器性能。

# 更进一步——详解ChannelOutboundBuffer
上文我们提到了ChannelOutboundBuffer发送消息,ChannelOutboundBuffer发送消息的工作方式也很简单:
- 我们调用write方法进行消息发送。
- 任务提交到NioEventLoop中。
- NioEventLoop轮询并提交任务。
- 任务走到ChannelOutboundBuffer将消息添加到自己的链表中。
- 判断当前缓冲区是否超过高水位,如果没超过则进行发送消息。
- 消息发送后释放内存。
了解整体步骤,我们直接通过debug了解一下关键流程,经过这么多篇文章的介绍,我们不妨直接在NioEventLoop轮询处理写任务的地方打个断点。
所以我们在AbstractChannelHandlerContext的run打个断点。

步入AbstractChannelHandlerContext的write可以看到就做了两件事,一个是调用write将消息添加到缓冲区,然后调用invokeFlush刷新并发送消息。
@Override
public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
super.write(ctx, msg, promise);
ctx.invokeFlush();
}
2
3
4
5
6
7
我们不断步进代码,最终走到invokeWrite0,可以看到它调用了ChannelOutboundHandler进行写入操作,我们不妨看看write干了些什么。
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
2
3
4
5
6
7
核心步骤如下所示,先将消息转为直接内存,然后添加到ChannelOutboundBuffer 末尾,我们不妨看看addMessage做了什么。
@Override
public final void write(Object msg, ChannelPromise promise) {
.....
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
int size;
try {
//转为直接内存
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
//添加到ChannelOutboundBuffer 链表后面
outboundBuffer.addMessage(msg, size, promise);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
可以看到就是将消息封装成节点,然后追加到链表尾巴的操作,最后调用incrementPendingOutboundBytes,我们步入看看incrementPendingOutboundBytes做了什么。
public void addMessage(Object msg, int size, ChannelPromise promise) {
//封装为节点
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
//尾节点指向当前节点
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(entry.pendingSize, false);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
可以看到代码逻辑非常简单,查看写入的缓冲区是否会大于高水位,如果大于则将写标志设置为不可写。
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
2
3
4
5
6
7
8
9
10
自此,write方法都结束,我们回到AbstractChannelHandlerContext看看flush做了什么。
如下可以看到,它会调用ChannelOutboundHandler调用flush,我们步入看看。
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
2
3
4
5
6
逻辑如下,首先会调用addFlush取出未处理的节点,然后调用flush0进行发送。
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
//从链表中取出最前面的为完成的节点
outboundBuffer.addFlush();
//
flush0();
}
2
3
4
5
6
7
8
9
10
11
12
13
经过笔者整理,可以看到flush0核心逻辑就是调用doWrite处理消息。
@SuppressWarnings("deprecation")
protected void flush0() {
......
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
........
}
} finally {
inFlush0 = false;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
重点来了,doWrite会首先获取javaChannel,还有writeSpinCount 即写消息最大循环次数,默认为16.这意味如果是netty的消息,在后续的while循环中16次没有完成数据写入javaChannel,netty就会调用incompleteWrite到netty注册一个写事件,等待下一次继续执行写任务,避免本地写事件占用线程时间。
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
//获取javaChannel
SocketChannel ch = javaChannel();
//获取netty消息最大写入循环次数
int writeSpinCount = config().getWriteSpinCount();
......
//判断消息是不是netty的消息
switch (nioBufferCnt) {
case 0:
// 如果是netty消息则走这个分支,调用doWrite0将数据写入javaChannle,writeSpinCount 默认为16,这意为着循环中超过16次没写完,则netty会继续注册一个写事件,等待下一次执行
writeSpinCount -= doWrite0(in);
break;
case 1: {
//非netty的则走这段逻辑
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
//往javaChannle写数据返回写入字节数
final int localWrittenBytes = ch.write(buffer);
//如果小于0则说明写入失败,则注册一个写事件等待NioEventLoop下次轮询到再执行
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
//如果消息都写完,这里就会进行读写索引更新,如果都读取完并发送了,会直接释放内存
in.removeBytes(localWrittenBytes);
//写入次数自减
--writeSpinCount;
break;
}
default: {
.........
}
}
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
为保证文章的连贯,补充一下incompleteWrite,可以看到它内部会调用setOpWrite向NioSocketChannel注册写事件。
protected final void incompleteWrite(boolean setOpWrite) {
//如果没写完,则注册写事件
if (setOpWrite) {
setOpWrite();
} else {
.....
}
}
2
3
4
5
6
7
8
注册逻辑如下,改方法用了与运算,判断当前是否注册了写事件,如果没有用或运算确保搞笑的将写事件注册到NioSocketChannel中。
protected final void setOpWrite() {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
//使用或操作,注册写事件
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
2
3
4
5
6
7
8
9
自此全文结束。