聊聊Netty异常传播链与最佳实践
# 写在文章开头
Netty通过责任链的思想解耦了各个业务的处理逻辑,是的用户可以非常方便的根据不同的生命周期进行相应的业务处理。而本文将针对Netty中的异常和异常传播过程进行分析,并给出最佳的处理技巧,希望对你有帮助。

Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

# 详解Netty中异常的工作机制和处理对策
# 异常问题复现
为了更好演示和说明,笔者这里给出一段服务都引导类配置的代码,可以看到笔者依次添加了两个入站处理器InboundHandler和出站处理器OutboundHandler来处理客户端连接:
public static void main(String[] args) {
//1. 声明引导类
ServerBootstrap serverBootstrap = new ServerBootstrap();
//2. 声明主从reactor线程组
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
serverBootstrap.group(boss, worker)//3. 基于上述线程池创建主从reactor模型
.channel(NioServerSocketChannel.class)//server channel采用NIO模型
.childHandler(new ChannelInitializer<NioSocketChannel>() {//添加客户端读写请求处理器到subreactor中
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 对于ChannelInboundHandlerAdapter,收到消息后会按照顺序执行即 A -> B
ch.pipeline().addLast(new InboundHandlerA())
.addLast(new InboundHandlerB());
// 处理写数据的逻辑,顺序是反着的 B -> A
ch.pipeline().addLast(new OutboundHandlerA())
.addLast(new OutboundHandlerB())
.addLast(new OutboundHandlerC());
}
});
//绑定8080端口并设置回调监听结果
serverBootstrap.bind("127.0.0.1", 8080)
.addListener(f -> {
if (f.isSuccess()) {
System.out.println("连接成功");
}
});
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
为了模拟异常,笔者在InboundHandlerA 中的channelRead方法编写了一个运算异常,并且重写了exceptionCaught方法打印异常详情,其余InboundHandler同理,这里就不贴出了:
public class InboundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerA : " + ((ByteBuf) msg).toString(StandardCharsets.UTF_8));
//模拟读取数据过程中发生的异常
int num = 1 / 0;
//将当前的处理过的msg转交给pipeline的下一个ChannelHandler
super.channelRead(ctx, msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("InboundHandlerA exceptionCaught : " + cause.getMessage());
super.exceptionCaught(ctx, cause);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
对应的OutboundHandler逻辑也是一样,除了必要的write方法重写以外,同样重写exceptionCaught用于打印异常:
ublic class OutboundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerA: " + ((ByteBuf)msg).toString(StandardCharsets.UTF_8));
///将当前的处理过的msg转交给pipeline的下一个ChannelHandler
super.write(ctx, msg, promise);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("OutboundHandlerA exceptionCaught: " + cause.getMessage());
super.exceptionCaught(ctx, cause);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
完成后我们启动程序,并通过telnet方式进行连接传输数据时,可以直接从控制台看到异常会按照我们处理器的添加顺序从InboundHandler顺序传播到OutboundHandler上:
InBoundHandlerA : 1
InboundHandlerA exceptionCaught : / by zero
InboundHandlerB exceptionCaught: / by zero
OutboundHandlerA exceptionCaught: / by zero
OutboundHandlerB exceptionCaught: / by zero
OutboundHandlerC exceptionCaught: / by zero
八月 21, 2024 1:18:47 下午 io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.ArithmeticException: / by zero
at netty.book.inoutbound.in.InboundHandlerA.channelRead(InboundHandlerA.java:15)
......
2
3
4
5
6
7
8
9
10
11
12
# 详解异常传播链的执行过程
接下来我们就从源码的角度来分析这个问题,我们通过引导类添加处理器时,通过addLast添加了这些读写处理器,本质上底层就是将这些处理器封装为AbstractChannelHandlerContext,然后通过链表的方式进行不断追加,由此构成一条处理链。

随后我们第一个读处理器InboundHandlerA 触发了一个错误,执行exceptionCaught方法,又因为错误没有及时被处理,导致这个错误按照从入站处理器InboundHandler再到出站处理器OutboundHandler顺序传播:

我们可以从源码来印证笔者的说法,先来说说addLast方法,这一点我们可以直接查看addLast底层调用即DefaultChannelPipeline的addLast方法,可以看到其内部正如笔者所说将处理器封装为AbstractChannelHandlerContext 然后追加到pipeline的链表上:
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//......
//将处理器封装为ChannelHandlerContext
newCtx = newContext(group, filterName(name, handler), handler);
//追加到当前pipeline链表上
addLast0(newCtx);
//......
return this;
}
2
3
4
5
6
7
8
9
10
11
12
13
对于异常,我们可以直接从上文处理器所服装的上下文AbstractChannelHandlerContext的invokeChannelRead方法印证,当服务端收到数据时就会调用invokeChannelRead方法处理,其内部就会调用到我们重写的channelRead,一旦出现异常就会直接触发notifyHandlerException传播这个异常触发所有处理器的exceptionCaught方法:
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
//调用我们InboundHandlerA的channelRead
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
//出现异常,调用InboundHandlerA的exceptionCaught
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
步入invokeExceptionCaught即可看到其内部本质就是回调我们处理器重写的exceptionCaught:
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
//......
}
} else {
fireExceptionCaught(cause);
}
}
2
3
4
5
6
7
8
9
10
11
最终走到我们的InboundHandlerA的exceptionCaught方法,逻辑之前已经说过,我们仅仅是打印,然后调用exceptionCaught继续将这个错误传播下去:
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("InboundHandlerA exceptionCaught : " + cause.getMessage());
super.exceptionCaught(ctx, cause);
}
2
3
4
5
InboundHandlerA调用exceptionCaught传播异常,步入exceptionCaught即可看到其内部逻辑即直接调用fireExceptionCaught将异常传给InboundHandlerB,后续过程也是同理,所以这里就不多做赘述了:
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
//当前hander调用fireExceptionCaught传播异常
ctx.fireExceptionCaught(cause);
}
//fireExceptionCaught调用invokeExceptionCaught将异常传播给当前链表节点的next节点,也就是InboundHandlerB
@Override
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
invokeExceptionCaught(next, cause);
return this;
}
2
3
4
5
6
7
8
9
10
11
12
最终错误异常走到tail节点也就是我们的TailContext,因为异常没有即使被处理,它会直接调用onUnhandledInboundException告警打印这个错误,完成后调用release将异常释放掉(其实底层就返回一个false):
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//打印错误
onUnhandledInboundException(cause);
}
protected void onUnhandledInboundException(Throwable cause) {
try {
//打印异常信息
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
ReferenceCountUtil.release(cause);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 异常传播处理的最佳实践
按照异常传播链的顺序性,笔者建议在处理器的最后追加一个ExceptionHandler ,当处理器遇到异常时,全部统一将异常流转到这个ExceptionHandler 然后根据类型进行统一处理:

对此我们给出一段ExceptionHandler 的使用示例:
public class ExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exception caught:" + cause.getMessage());
//按照异常类型按需处理
if (cause instanceof IOException) {
ctx.close();
}
}
}
2
3
4
5
6
7
8
9
10
11
对应的追加的方式如下,可以看到直接在尾部进行追加:
// 对于ChannelInboundHandlerAdapter,收到消息后会按照顺序执行即 A -> B
ch.pipeline().addLast(new InboundHandlerA())
.addLast(new InboundHandlerB());
// 处理写数据的逻辑,顺序是反着的 B -> A
ch.pipeline().addLast(new OutboundHandlerA())
.addLast(new OutboundHandlerC())
.addLast(new OutboundHandlerB());
ch.pipeline().addLast(new ExceptionHandler());
2
3
4
5
6
7
8
9
10
# 小结
本文从一个异常错误的案例深入剖析了Netty异常处理链,并给出响应的处理技巧,希望对你有帮助。
我是 sharkchili ,CSDN Java 领域博客专家,开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。 因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
