Netty解码器源码解析
# 写在文章开头
# 解码器源码解析
# 解码器父类ByteToMessageDecoder源码分析
ByteToMessageDecoder是所有解码器的父类,要想了解Netty解码器的整体流程,我们可以从它的channelRead方法入手,源码逻辑比较清晰,它实现了解码操作各个工作的核心工序,并将解码逻辑交由下层自类实现,这里笔者先大概介绍一下核心流程:
- 判断
msg是否是ByteBuf,如果不是则直接调用fireChannelRead将msg传播,如果是则进入步骤2。 - 初始化一个解码器结果存放列表
out。 - 判断累加器是否为空,并将结果赋值给
first。 - 如果为空,则初始化累加器。
- 如果非空,则将
msg追加到累加器中。 - 执行
callDecode完成数据包解码。 - 将解码结果传播到下文业务处理器中,并置空
out列表。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//如果msg是ByteBuf则进入逻辑
if (msg instanceof ByteBuf) {
//创建一个存放解码结果的列表
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
//将累加器是否为空的布尔值赋值给first
first = cumulation == null;
//如果first为null,说明本次是第一次累加,则直接将cumulation设置为data,反之将data累加到cumulation中
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
//调用解码器进行解析
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
//略
//获取解码输出结果
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
//将解析到的byteBuf向下传播
fireChannelRead(ctx, out, size);
//回收out列表,将每一个索引位置清空
out.recycle();
}
} else {
//不是bytebuf的则直接向下传播
ctx.fireChannelRead(msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
我们不妨对上述整体流程进行详尽分析,上文提到累加操作的代码:cumulator.cumulate(ctx.alloc(), cumulation, data);查看cumulator的定义我们可以得到这样一段代码:
private Cumulator cumulator = MERGE_CUMULATOR;
查看MERGE_CUMULATOR实现,其整体步骤比较清晰:
- 查看累加器空间是否足够写入
in。 - 若不够则进行扩容,容量扩为当前累加器的容量加
in的大小,进入步骤4。 - 反之
buffer直接设置为累加器,进入步骤4。 - 将
in写入buffer中。 - 释放
in。
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
//写的空间不足或者 cumulation.refCnt() > 1则对cumulation进行扩容
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
//反之buffer就等于累加器
buffer = cumulation;
}
// 将数据写入buffer
buffer.writeBytes(in);
// 释放in
in.release();
return buffer;
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
再来看看callDecode方法,它是实际上完成解码操作的地方,可以看到ByteToMessageDecoder已经将大部分核心逻辑都实现了,唯独将decode设置为抽象方法,提供子类实现,整体核心逻辑也比较清晰:
- 判断传入的
byteBuf是否可读,若可读则进入循环。 - 判断当前解码列表是否有数据,若有则说明前几次的循环已经完成了一次完整的数据包解码操作,直接调用
fireChannelRead将数据和列表传到下一个解码器中,并将out列表清空。反之进入步骤3。 - 调用
decode完成解码操作,该方法是抽象方法,不同的解码器有着不同的实现,总之每个抽象类完成解码操作之后,都会将解码的结果存到out列表中。 - 判断本次循环是否有解码操作,若还未完成则循环跳到步骤2,反之结束循环。
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
//如果ByteBuf 可读,则进入循环
while (in.isReadable()) {
//获取解码列表的元素个数
int outSize = out.size();
//如果解码列表大于0,则说明之前循环执行过程中完成了byteBuf解码操作,则将解码列表传给下一个解码器,并将out列表清空
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
//获取in的可读字节数
int oldInputLength = in.readableBytes();
//调用子类的decode方法将in解析成需要的数据存到out列表中
decode(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
//经过解码后的out列表还是和原来的长度一样则说明本次循环没有完成一次完整的解码操作,则进入if逻辑中
if (outSize == out.size()) {
//如果oldInputLength和和in的可读长度一样,说明in的数据因为某些原因无法解码直接退出循环
if (oldInputLength == in.readableBytes()) {
break;
} else {
//走到这个分支说明本次循环有进行解码工作,但还是没有完全完成,跳到下一次循环进行
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# FixedLengthFrameDecoder源码分析
由上文我们知道了ByteToMessageDecoder将decode设置为抽象方法提供子类实现,FixedLengthFrameDecoder就是其中一个子类,它是基于固定长度的解码器,这一点我们在上文的示例代码中就已经做了示范。
在正式阅读源码之前,我们不妨看看FixedLengthFrameDecoder源码中作者所给的注释,可以看到作者给了一个非常直观的代码示例,大意是说假如我们收到了4个分段的数据包,每个包的内容分别是:A、BC、DEFG、HI 。
假如我们使用FixedLengthFrameDecoder作为解码器,并将长度设置为3,那么最终业务处理器得到的数据包将是:ABC、DEF、GHI。
/**
* A decoder that splits the received {@link ByteBuf}s by the fixed number
* of bytes. For example, if you received the following four fragmented packets:
* <pre>
* +---+----+------+----+
* | A | BC | DEFG | HI |
* +---+----+------+----+
* </pre>
* A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the
* following three packets with the fixed length:
* <pre>
* +-----+-----+-----+
* | ABC | DEF | GHI |
* +-----+-----+-----+
* </pre>
*/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
基于这个示例我们回到源码查看它是如何做到这一点的,它对于decode的实现,显示调用自己实现的decode生成一个decoded ,如果不为空则存到out列表中。
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
2
3
4
5
6
7
我们步入查看一下decode的逻辑,它会判断ByteBuf 可读长度是否小于固定长度,若小于固定长度则说明无法切割直接返回null,反之调用readRetainedSlice进行切割解码。
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//可读长度小于切割长度,直接返回null
if (in.readableBytes() < frameLength) {
return null;
} else {
//调用readRetainedSlice进行拆包
return in.readRetainedSlice(frameLength);
}
}
2
3
4
5
6
7
8
9
10
查看readRetainedSlice方法,它是AbstractByteBuf的提供的默认实现,逻辑也很简单,通过入参的length截取得到一个slice ,随后更新当前的byteBuf的写索引并将slice 返回。
@Override
public ByteBuf readRetainedSlice(int length) {
//基于length切割出一个完整的ByteBuf,记录当前读取的字节,然后直接返回
ByteBuf slice = retainedSlice(readerIndex, length);
readerIndex += length;
return slice;
}
2
3
4
5
6
7
# LineBasedFrameDecoder源码分析
LineBasedFrameDecoder是一个基于换行符实现的解码器,通过注释我们也可以知道,它进行拆包时会工具\n或者\r\n这两种换行符进行拆包的。
/**
* A decoder that splits the received {@link ByteBuf}s on line endings.
* <p>
* Both {@code "\n"} and {@code "\r\n"} are handled.
* For a more general delimiter-based decoder, see {@link DelimiterBasedFrameDecoder}.
*/
2
3
4
5
6
查看源码入口,可以看到逻辑和FixedLengthFrameDecoder差不多,这里就不多赘述了,直接查看decode的实现。
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
2
3
4
5
6
7
decode的逻辑比较长,为了让读者更好的理解源码,笔者这里才用分段叙述的方式进行讲解,首先代码会查找换行符的位置,findEndOfLine查找换行符的方式比较特殊,考虑到换行符有\r\n或者\n,所有需要考虑下面3种情况:
- 查询数据中有没有
\n,若存在进入步骤2,若没有直接返回-1。 - 如果
\n前面是\r则返回\r的位置,反之进入步骤3。 - 反之返回
\n的位置。
final int eol = findEndOfLine(buffer);
完成eol计算之后,我们继续查看后续逻辑,来说说第1种情况,这里先说明一下discarding是丢弃标识,默认情况下为false,当它为true时则说明本次读取的数据包大于可读取的最大长度,需要丢弃。
下面这段代码即说明非丢弃模式下且存在换行符,它进行了这样的操作:
- 判断读取的长度
length是否大于最大长度maxLength,若大于则说明该数据包超长,需要跳过,故更新读索引,将读索引值设置为换行符索引后面的位置,若不大于则进入步骤2。 - 来到步骤2这一步则说明存在换行符且长度合法,判断
stripDelimiter是否为true,若为true则进入步骤3,若为false进入步骤4。 - 截取的数据不需要包含换行符,读取时跳过换行符,然后将数据返回。
- 截取的数据需要包含换行符,连着数据以及换行符一并截取并返回。
自此我们将情况1的流程都讲解完成了,对应的核心代码部分如下:
//非丢弃模式
if (!discarding) {
//存在换行符
if (eol >= 0) {
final ByteBuf frame;
//获取需要读取的长度
final int length = eol - buffer.readerIndex();
//获取换行符长度
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
//如果长度大于最大长度,则调整读索引,设置到换行符后面,读取下一个数据包,并调用fail抛个异常,再返回null
if (length > maxLength) {
buffer.readerIndex(eol + delimLength);
fail(ctx, length);
return null;
}
//走到这说明长度合法,判断读取是否需要包含换行符,若为true则需要跳过,false则包含
if (stripDelimiter) {
//读取数据到frame
frame = buffer.readRetainedSlice(length);
//跳过换行符
buffer.skipBytes(delimLength);
} else {
//读取数据以及换行符到frame中
frame = buffer.readRetainedSlice(length + delimLength);
}
return frame;
} else {
//略
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
情况2则是非丢弃模式下不存在换行符,它的执行步骤为:
- 获取可读长度
length。 - 如果
length大于maxLength,若不大于则说明当前数据还未达到规定数据包最大长度,直接返回null,等下一次收报新的数据流再查询一次,反之进入步骤3。 - 设置丢弃的长度为
length,更新读索引位置到写索引,意为跳过这段字节数据。 - 将丢弃模式
discarding设置true。 - 判断
failFast是否为true,若为true则抛出一个异常。
if (!discarding) {
if (eol >= 0) {
//略
} else {//非丢弃模式下不存在换行符
//获取可读长度
final int length = buffer.readableBytes();
//如果大于最大长度,则设置丢弃长度为length
if (length > maxLength) {
discardedBytes = length;
buffer.readerIndex(buffer.writerIndex());
discarding = true;
if (failFast) {
fail(ctx, "over " + discardedBytes);
}
}
return null;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
因为上一个情况将discarding设置为了true,下一次收到数据流时我们来到了情况3,即丢弃模式且不存在换行符。它的执行步骤为,丢弃长度追加本次读取的可读的字节数,buffer直接跳过本地要读取的数据。
if (!discarding) {
//略
} else {
if (eol >= 0) {
//略
} else {
//丢弃长度追加本地可读的长度
discardedBytes += buffer.readableBytes();
//更新读取位置到本次读取的数据包后面
buffer.readerIndex(buffer.writerIndex());
}
return null;
}
2
3
4
5
6
7
8
9
10
11
12
13
基于上述场景,我们本次读取到了换行符,即在丢弃模式下读取到了换行符,于是来到的情况4,说明本次读取数据尽管存在换行符,但还是需要丢弃,于是代码就会经过这样的执行步骤:
- 计算跳过长度加读取长度的长度总和。
- 计算换行符长度。
- 将读索引更新到换行符后面,意味跳过本次超长数据的读取。
- 丢弃模式设置为
false。 - 如果非快速失败,则抛出一个异常。
if (!discarding) {
//略
} else {
if (eol >= 0) {
//计算跳过长度+读取长度的长度总和
final int length = discardedBytes + eol - buffer.readerIndex();
//计算换行符长度
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
//将读索引更新到换行符后面
buffer.readerIndex(eol + delimLength);
//重置丢弃长度和丢弃模式
discardedBytes = 0;
discarding = false;
//如果非快速失败,则抛出一个异常
if (!failFast) {
fail(ctx, length);
}
} else {
//略
}
return null;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
最后我们做个小结完整的代码及注释如下:
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
//查找换行符位置
final int eol = findEndOfLine(buffer);
//非丢弃模式
if (!discarding) {
//情况1:非丢弃模式且存在换行符
if (eol >= 0) {
final ByteBuf frame;
//读取可读长度以及换行符长度
final int length = eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
//如果要读取的数据大于最大长度,则更新读索引到换行符后面,并抛出一个异常
if (length > maxLength) {
buffer.readerIndex(eol + delimLength);
fail(ctx, length);
return null;
}
//需要跳过换行符,则读取换行符之前的数据
if (stripDelimiter) {
frame = buffer.readRetainedSlice(length);
buffer.skipBytes(delimLength);
} else {
//读取包含换行符的数据
frame = buffer.readRetainedSlice(length + delimLength);
}
return frame;
} else {
//情况2:非丢弃模式且不包含换行符
//先获取读取长度
final int length = buffer.readableBytes();
//如果大于最大长度,则将模式切换为丢弃模式,丢弃长度为length
if (length > maxLength) {
discardedBytes = length;
buffer.readerIndex(buffer.writerIndex());
discarding = true;
if (failFast) {
fail(ctx, "over " + discardedBytes);
}
}
return null;
}
} else {
//情况3:丢弃模式下,存在换行符,则将换行符以及之前的数据全部跳过,完成后将丢弃模式设置为false,重置discardedBytes。
if (eol >= 0) {
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false;
if (!failFast) {
fail(ctx, length);
}
} else {
//情况4:丢弃模式下不存在换行符,追加丢弃数据长度,并将读索引更新到本次读取数据的末尾,放弃这段长数据的读取
discardedBytes += buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
}
return null;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# DelimiterBasedFrameDecoder源码分析
基于特殊分隔符的解码器和行解码器差不多,只不过我们可以自定义设置进行分割的符号,入口代码套路一致,不多赘述。
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
2
3
4
5
6
7
8
9
decode代码还是很长,我们还是采用分段的形式来讲解,首先代码会判断lineBasedDecoder 是否不为空,若不为空则用lineBasedDecoder 进行解码,这是为什么呢?
if (lineBasedDecoder != null) {
return lineBasedDecoder.decode(ctx, buffer);
}
2
3
我们在初始化解码器时,它会判断我们的分隔符,如果是换行符,则将lineBasedDecoder初始化,复用了行解码器。
public DelimiterBasedFrameDecoder(
int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {
//略
//如果我们定义的符号是换行符则直接复用行解码器
if (isLineBased(delimiters) && !isSubclass()) {
lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
this.delimiters = null;
} else {
//略
}
//略
}
2
3
4
5
6
7
8
9
10
11
12
紧接着来到第2大步,遍历所有的分隔符,找到符合存在某个分隔符的最短长度bytebuf。
int minFrameLength = Integer.MAX_VALUE;
ByteBuf minDelim = null;
//遍历分隔符
for (ByteBuf delim: delimiters) {
//找到索引值最小的分隔符,即最小长度byteBuf
int frameLength = indexOf(buffer, delim);
//如果长度大于且 小于minFrameLength则更新minFrameLength为当前长度和minDelim 为当前分隔符
if (frameLength >= 0 && frameLength < minFrameLength) {
minFrameLength = frameLength;
minDelim = delim;
}
}
2
3
4
5
6
7
8
9
10
11
12
于是我们来到了情况1,如果数据中存在分隔符,则进入以下步骤:
- 先判断是否是丢弃模式,如果是丢弃模式,则说明之前读取数据流过程中遇到超长数据,本次尽管读取到分隔符,也是一个超长的存在分隔符的数据,需要丢弃,故重置丢弃模式为
false,并跳过本次要读取的数据,直接返回null,反之进入步骤2。 - 如果读取的长度大于最大长度,则跳过并抛个异常,反之进入步骤3。
- 如果读取时需要跳过分隔符则仅读取分隔符之前的数据,反之连着分隔符一起读取。
- 返回读取结果。
//分隔符不为空,存在可读数据
if (minDelim != null) {
//获取分隔符长度
int minDelimLength = minDelim.capacity();
ByteBuf frame;
//如果当前为丢弃模式,即可说明之前的数据读取过程中存在超长数据,和本次包含分隔符的数据是一个整体,需要一并丢弃
if (discardingTooLongFrame) {
//重置丢弃模式为false,并将读索引更新到分隔符后面,跳过这些数据的读取
discardingTooLongFrame = false;
buffer.skipBytes(minFrameLength + minDelimLength);
//重置tooLongFrameLength
int tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
//如果非快速失败则抛个异常
if (!failFast) {
fail(tooLongFrameLength);
}
return null;
}
//如果需要读取的长度大于最大长度,则跳过并抛个异常
if (minFrameLength > maxFrameLength) {
// Discard read frame.
buffer.skipBytes(minFrameLength + minDelimLength);
fail(minFrameLength);
return null;
}
//如果读取时需要跳过分隔符则读取分隔符之前的数据并跳过分隔符
if (stripDelimiter) {
frame = buffer.readRetainedSlice(minFrameLength);
buffer.skipBytes(minDelimLength);
} else {
//如果需要包含分隔符则将数据和分隔符一起读取
frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
}
return frame;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
了解了情况1,我们再来说说情况2,如果读取的数据不包含分隔符,则进入下面的步骤:
- 如果是非丢弃模式,先判断可读数据是否超过最大长度,若超过则开启丢弃模式,告知后续就算找到分隔符和本次读取的数据也是个整体也需要丢弃,然后跳过本次读取的数据。反之进入步骤2。
- 如果是丢弃模式,则直接跳过本次读取的数据,并累加丢弃数据的长度
tooLongFrameLength。
if (minDelim != null) {
//略
} else {
//不包含分隔符
//非丢弃模式
if (!discardingTooLongFrame) {
if (buffer.readableBytes() > maxFrameLength) {
//设置tooLongFrameLength 为本次可读长度
tooLongFrameLength = buffer.readableBytes();
//跳过本次可读数据
buffer.skipBytes(buffer.readableBytes());
//开启丢弃模式
discardingTooLongFrame = true;
//如果是快速失败则抛个异常
if (failFast) {
fail(tooLongFrameLength);
}
}
} else {
// 丢弃模式下,直接跳过本次读取的数据,并累加丢弃数据的长度
tooLongFrameLength += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
自此我们把DelimiterBasedFrameDecoder也给讲完了,来小结一下执行过程:
- 判断分隔符是否是换行符,如果是则直接复用行解码器进行解码,反之进入步骤2。
- 尝试截取包含某个分隔符的
ByteBuf。 - 如果存在这样的
ByteBuf且是丢弃模式,则将当前这个ByteBuf丢弃,并将丢弃模式设置为false,并将长度累加tooLongFrameLength设置为0,进入下一次的读取,反之按要求截取包含或不包含分隔符的ByteBuf返回。 - 如果不存在符合要求的
ByteBuf且非丢弃模式,先判断是否超过最大读取长度,若超过直接跳过本次读取并开启丢弃模式,返回null。 - 再次读取若还是没有遇到分隔符直接丢弃
ByteBuf并累加tooLongFrameLength。
# LengthFieldBasedFrameDecoder源码分析
LengthFieldBasedFrameDecoder算是比较经典且常用的解码器了,它会按照我们提供的偏移量和长度找到描述长度的字段,并按照长度进行读取。
同样的decode源码也是比较长,我们还是以分段的方式来说明,默认情况下discardingTooLongFrame为false,我们这里假设为true,因为前几次获取数据流并解码过程中遇到了超长的数据,需要进行丢弃操作,所以它的执行步骤为:
- 如果数据过长,比较可读字节以及
bytesToDiscard的最小值,获取实际需要丢弃的大小。 - 跳过要丢弃的字节。
- 更新
bytesToDiscard的值。
//如果数据过长
if (discardingTooLongFrame) {
//获取当前可以跳过的最小值
long bytesToDiscard = this.bytesToDiscard;
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
//跳过需要丢弃的位置
in.skipBytes(localBytesToDiscard);
//bytesToDiscard设置为读取后的字节数
bytesToDiscard -= localBytesToDiscard;
this.bytesToDiscard = bytesToDiscard;
//若需要则抛出个异常
failIfNecessary(false);
}
2
3
4
5
6
7
8
9
10
11
12
13
然后再判断是否还有可读字节并判断是否小于lengthFieldEndOffset(即描述长度的字段的偏移量),若小于则说明当前数据包可能还未接收完整,直接返回null。
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
2
3
走到这里说明可以尝试截取数据包了,它的执行步骤为:
- 精确定位到描述长度的字段的位置。
- 获取
length存放的长度,如果小于0抛出异常,反之进入下一步。 - 判断数据包长度是否合法,如果数据包长度小于数据包起始位置到长度字段的位置,则说明当前数据包有问题,则跳过这几个字节,并抛出异常,反之进入步骤4。
- 将
frameLength(表示数据字段的长度)加上lengthAdjustment(长度调整大小值)和lengthFieldEndOffset(长度字段的偏移量,可以直接理解为长度字段之前的长度)获取实际数据包的长度,如果数据包长度小于数据包起始位置到长度字段的位置,则说明当前数据包有问题,则跳过这几个字节,并抛出异常。
//获取长度字段的实际索引位置
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
//获取长度字段的值
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
//如果长度值小于0则跳过这几个字节,并抛出异常
if (frameLength < 0) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"negative pre-adjustment length field: " + frameLength);
}
//将frameLength 加上lengthAdjustment 和lengthFieldEndOffset获取实际数据包的长度
frameLength += lengthAdjustment + lengthFieldEndOffset;
//如果frameLength 小于lengthFieldEndOffset则说明包不完整,直接抛异常
if (frameLength < lengthFieldEndOffset) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than lengthFieldEndOffset: " + lengthFieldEndOffset);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
得到正确的数据包长度frameLength 后,进行如下步骤:
- 判断长度是否超长,若超长进入步骤2。
- 计算
frameLength和可读字节的差discard。 - 如果小于0,则说明超长的部分小于可读字节数,直接跳过即可,反之执行步骤4。
- 若大于0,则将丢弃模式设置为
true,并将discard赋值给bytesToDiscard,并跳过可读字节。 - 完成跳过步骤后,调用
failIfNecessary,若有必要抛出一个异常。 - 最后判断
frameLength是否小于可读字节,若是说明包还未收完整,直接返回null,下一次收到新数据再次读取并尝试。
//数据包如果超过最大长度
if (frameLength > maxFrameLength) {
//计算超出的部分
long discard = frameLength - in.readableBytes();
//将frameLength设置为超长长度存到tooLongFrameLength 中
tooLongFrameLength = frameLength;
//若小于0说明frameLength 小于可读字节数,直接跳过即可
if (discard < 0) {
// buffer contains more bytes then the frameLength so we can discard all now
in.skipBytes((int) frameLength);
} else {
// 若大于0,则将丢弃模式设置为true,并将discard设置给bytesToDiscard ,并跳过可读字节
discardingTooLongFrame = true;
bytesToDiscard = discard;
in.skipBytes(in.readableBytes());
}
failIfNecessary(true);
return null;
}
//如果可读部分小于frameLengthInt 说明包还未受完整,返回null
int frameLengthInt = (int) frameLength;
if (in.readableBytes() < frameLengthInt) {
return null;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
接下来就是完整的拆包部分了,执行步骤为:
- 判断要跳过的字节数是否大于计算的数据包长度,若是则跳过并抛异常,反之进入步骤2。
- 从起始位置开始跳过
initialBytesToStrip设置的字节数。 - 计算跳过后实际要截取的长度
actualFrameLength。 - 截取数据包得到一个
frame并返回。
//判断要跳过的字节数是否大于计算的数据包长度,若是则跳过并抛异常
if (initialBytesToStrip > frameLengthInt) {
in.skipBytes(frameLengthInt);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than initialBytesToStrip: " + initialBytesToStrip);
}
//跳过initialBytesToStrip的字节数
in.skipBytes(initialBytesToStrip);
// 计算跳过后实际要截取的长度actualFrameLength
int readerIndex = in.readerIndex();
int actualFrameLength = frameLengthInt - initialBytesToStrip;
//截取数据包得到一个frame 并返回
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
in.readerIndex(readerIndex + actualFrameLength);
return frame;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
自此我们将LengthFieldBasedFrameDecoder也分析完成了,来小结一下整体步骤:
- 判断是否是丢弃模式,如果是且数据过长,则丢弃localBytesToDiscard个数据。
- 丢弃完成后若还是有数据可读,则尝试获取长度字段的位置,并判断当前数据长度是否足以保证读取到长度位置的字段,若不可以则抛异常,反之读取长度的值。
- 计算总长度
frameLength是否小与数据包起始位置到长度字段位置的长度,如果小于则说明数据有问题直接抛异常,反之进入步骤4。 - 基于上述步骤计算的总长度
frameLength来到步骤4则判断数据包是否超长,如果超长则丢弃,若小于可读字节数,说明数据包还没读完,等待下一次收到字节流再次从步骤1开始执行,反之按照设置的要求拆包并返回ByteBuf。
# 参考文献
跟闪电侠学 Netty:Netty 即时聊天实战与底层原理:https://book.douban.com/subject/35752082/ (opens new window) Netty学习(五)-DelimiterBasedFrameDecoder:https://blog.csdn.net/a953713428/article/details/68231119 (opens new window)