Netty连接可靠性Idle监测连环问
Netty中连接可靠性中的Idle可靠性监测功能可能会涉及如下几道面试题:
- 为什么
TCP提供了keepalive,而我们的服务端程序还是需要Idle检测呢? TCP的keepalive和HTTP协议的keepalive有什么区别?- 如何设计应用层
Idle监测机制? - 项目中如何使用
idle检测? Netty是如何实现Idle检测的?Netty默认的IdleStateHandler有哪些?
Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

# 详解Netty中的Idle检测面试题
# 为什么TCP提供了keepalive,而我们的服务端程序还是需要Idle检测呢
本质上Linux操作系统也提供了连接可靠性的机制,即keepalive,这一点我们可以通过查看Linux系统内核参数指令查看:
sudo sysctl -a|grep tcp_keep
对应输出结果如下:
net.ipv4.tcp_keepalive_intvl = 75
net.ipv4.tcp_keepalive_probes = 9
net.ipv4.tcp_keepalive_time = 7200
2
3
可以看到默认情况下Linux系统对于tcp连接的空闲检测为7200s也就是2小时执行一次,如果发现连接不可用会每隔75s再次进行确认,一旦超过9次则视为该连接下线:

这也就意味着2小时以内的连接出现以下异常是未知的:
- 对端异常崩溃。
- 对端连接还在,但是连接却不可达了。
- 对端连接可用,但是无法立即处理我们的请求。
为了保证业务程序连接可靠性去修改系统参数对于全局影响面是很大的,所以这才有了应用程序上的Idle监测。
# TCP的keepalive和HTTP协议的keepalive有什么区别
读者需要注意这个概念和http协议的上的keepalive加以区分,众所周知,一个网页中可能包含js、css、图片、接口请求等多个服务端网络交互,http协议的keepalive机制的出现就是为了能够复用当前的TCP连接完成所有的请求,由此避免每次创建TCP连接去请求资源的耗时和开销:

与之相反的是本文所强调的TCP keepalive更多强调的是传输层定时去检测的TCP连接是否是"通",即强调是连接可用性。
# 如何设计与实现应用层Idle监测机制
先来说说第一种方案,针对每一个建立连接的客户端channel设置定时任务,每隔一段时间进行一次连接监测:
channel.eventLoop().schedule(()->{
//todo 进行连接监测
},15, TimeUnit.SECONDS);
2
3
咋一看这种做法没有问题,请读者假设此时服务器和数万条连接同时进行连接监测,这其中的开销成本如何呢?而且这其中不乏大量可用连接,为了保证可靠性,定时做这样一件开销极大的事情是不是有点得不偿失呢?

于是我们就有了方案2,也就是空闲监测,对于这成千上万的连接,我们可以定时进行空闲监测只有出现空闲的连接,我们才发送keep alive进行探测,由此减少服务器的开销:

# 项目中如何使用idle检测
由于netty良好的封装,我们开启keep alive和idle监测都十分方便,大体分为以下两步:
- 开启
keep alive。 - 创建一个
idle监测处理器,设置好定时。
开始keep alive的方式的配置如下:
bootstrap.group(new NioEventLoopGroup()).channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true);
2
然后就是设置Idle监测处理器,可以看到笔者之前继承Netty自带的IdleStateHandler ,设置读空闲监测为15,其余写idle和全idle监测设置为0,定时单位为秒:
public class MyIdleStateHandler extends IdleStateHandler {
public MyIdleStateHandler() {
// 读超时时间、写超时时间、读写超时时间 指定0值不判断超时
super(15, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
System.out.println(READER_IDLE_TIME + "秒内没有读到数据,关闭连接");
//空闲关闭
ctx.channel().close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
IdleStateHandler每当收到消息后,就会实时更新lastReadTime即上次读取消息时间,该时间是用于计算空闲时间的变量,我们的空闲监测就会基于当前时间减去lastReadTime,从而得到一个差值,然后基于这个差值再和我们设置的timeout超时时长进行比对,如果差值大于我们所设置的timeout就说明当前连接处于空闲即idle状态。
使用时我们建议将MyIdleStateHandler添加到pipeline首个位置,保证不会因为其他处理器异常导致无法收到消息进而无法更新IdleStateHandler的上次读取到数据时间lastReadTime:

对应的我们的代码如下所示:
bootstrap.group(new NioEventLoopGroup()).channel(NioSocketChannel.class)
//......
.option(ChannelOption.SO_KEEPALIVE, true)
//......
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
// 空闲检测放在最前方
.addLast(new MyIdleStateHandler())
//......
}
});
2
3
4
5
6
7
8
9
10
11
12
13
# 从源码角度分析Netty如何实现Idle检测
我们以读空闲为例,一旦我们按照上述代码完成idle监测代码之后,Netty就会向eventLoop提交一个当前channel的ReaderIdleTimeoutTask任务,它会定时执行,然后查看当前时间减去上次读取到消息的时间是否大于定时的间隔,如果大于则说明当前连接处理idle状态,于是封装一个读idle空闲事件将消息传播给我们的IdleStateHandler。反之,如果小于的话则说明当前连接处于活跃状态,更新定时监测间隔为当前时间减去上次读取到数据的时间间隔:

对应的我们给出ReaderIdleTimeoutTask 这个异步任务的源码,和笔者说的基本一致,读者可以参照笔者给的注释进行阅读:
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
//......
@Override
protected void run(ChannelHandlerContext ctx) {
//获取定时任务执行间隔
long nextDelay = readerIdleTimeNanos;
//如果并不处于读取状态,则用定时间隔减去空闲时长
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;
}
//小于0说明空闲了,设置下一次空闲监测并传播空闲事件到channelIdle
if (nextDelay <= 0) {
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
//设置下一次空闲监测并传播空闲事件到channelIdle
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
//连接处于活跃状态,更新下次定时监测间隔为上一次当前时间减去上次读取到数据的时长
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
而写空闲监测也是差不多的原理,一旦发现到到达定时间隔尚未完成写入,则直接调用writeTimedOut将channel关闭,然后移除该任务:
@Override
public void run() {
//到达定时间隔尚未被成功写入
if (!promise.isDone()) {
try {
//将channel关闭
writeTimedOut(ctx);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
}
//移除WriteTimeout任务事件
removeWriteTimeoutTask(this);
}
protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
//空闲检测到写idle则将连接关闭
if (!closed) {
ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
ctx.close();
//将连接状态设置为true
closed = true;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Netty中默认的IdleStateHandler有哪些?
通过查看源码可知Netty默认情况下已经给我们内置了读写Idle的处理器,它们分别是ReadTimeoutHandler和WriteTimeoutException:

查看ReadTimeoutHandler源码即可知晓,我们使用ReadTimeoutHandler时只需指明超时时间,一旦感知到了idle,ReadTimeoutHandler的channelIdle方法就会将当前channel连接关闭:
public class ReadTimeoutHandler extends IdleStateHandler {
//传入超时时间,单位为秒
public ReadTimeoutHandler(int timeoutSeconds) {
this(timeoutSeconds, TimeUnit.SECONDS);
}
//指明判断的超时时间和单位
public ReadTimeoutHandler(long timeout, TimeUnit unit) {
super(timeout, 0, 0, unit);
}
//感知到了超时调用readTimedOut方法
@Override
protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
assert evt.state() == IdleState.READER_IDLE;
readTimedOut(ctx);
}
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
//传播ReadTimeoutException并将channel连接关闭
if (!closed) {
ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
ctx.close();
closed = true;
}
}
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
而WriteTimeoutHandler则特殊一些,我们指明时间之后,每次我们调用write操作之后,它都会向eventLoop提交一个WriteTimeoutTask,这个WriteTimeoutTask感知到超时之后就会将当前idle的channel关闭:

对应的我们给出WriteTimeoutHandler 的源码,可以看到它是个ChannelOutboundHandlerAdapter,我们设定好时间添加到pipeline上后,每次write操作就会提交一个超时检测的WriteTimeoutTask ,这个WriteTimeoutTask 感知到超时之后就会将channel关闭:
public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
//设定超时时间
public WriteTimeoutHandler(int timeoutSeconds) {
this(timeoutSeconds, TimeUnit.SECONDS);
}
//设定超时时间
public WriteTimeoutHandler(long timeout, TimeUnit unit) {
ObjectUtil.checkNotNull(unit, "unit");
if (timeout <= 0) {
timeoutNanos = 0;
} else {
timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
}
}
//每次write传播传到到WriteTimeoutHandler后就会提交一个WriteTimeoutTask
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (timeoutNanos > 0) {
promise = promise.unvoid();
//提交一个WriteTimeoutTask
scheduleTimeout(ctx, promise);
}
ctx.write(msg, promise);
}
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
关于WriteTimeoutTask的逻辑,笔者这里也给出,可以看到该任务检测到超时之后就会调用writeTimedOut将channel关闭:
private final class WriteTimeoutTask implements Runnable, ChannelFutureListener {
//......
@Override
public void run() {
//超时后写入未完成,调用writeTimedOut将channel关闭
if (!promise.isDone()) {
try {
writeTimedOut(ctx);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
}
removeWriteTimeoutTask(this);
}
//将idle的channel关闭
protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
if (!closed) {
ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
ctx.close();
closed = true;
}
}
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 小结
以上便是笔者对于netty中idle监测的面试题解析,写对你有帮助。
我是 sharkchili ,CSDN Java 领域博客专家,开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。 因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

# 参考
Http——Keep-Alive机制:https://www.cnblogs.com/caoweixiong/p/14720254.html (opens new window)
Netty 心跳服务之 IdleStateHandler 源码分析 :https://www.jianshu.com/p/f2ed73cf4df8 (opens new window)