Netty网络传输简记
# 前言
在上一篇文章JavaNetty基础概念入门小结 (opens new window)对Netty有了简单的认识,这一篇文章我们不妨对Netty进行进一步的探讨,我们不妨来聊聊Netty的传输。如果你尝试过JDK原生的网络编程,你就会发现使用原生API实现一次简单的网络交互有多么的繁琐,好在Netty为我们封装了许多的细节,我们只需按照其约定实现自己所需的逻辑细节即可,同样使得我们的代码没有被繁琐的创建过程污染,大大提升Java网络编程开发效率。
# 不通过Netty的OIO和NIO
# OIO使用案例
这里我们不妨看看原生的OIO是如何实现的吧,代码如下所示,整体步骤也很固定:
- 创建ServerSocket
- 获取clientSocket
- 阻塞监听clientSocket
- accept收到消息后创建线程处理消息
- 返回给客户端欢迎语句后关闭流
public void serve(int port) throws IOException {
//将服务器绑定到指定端口
final ServerSocket socket = new ServerSocket(port);
try {
for(;;) {
//接受连接
final Socket clientSocket = socket.accept();
System.out.println("Accepted connection from " + clientSocket);
//创建一个新的线程来处理该连接
new Thread(new Runnable() {
@Override
public void run() {
OutputStream out;
try {
//将消息写给已连接的客户端
out = clientSocket.getOutputStream();
out.write("你好,这里是原生OIO服务端的响应\r\n".getBytes(Charset.forName("UTF-8")));
out.flush();
//关闭连接
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
clientSocket.close();
} catch (IOException ex) {
// ignore on close
}
}
//启动线程
}
}).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
了解完代码整体流程之后,我们直接编程启动代码进行测试
public static void main(String[] args) throws IOException {
new PlainOioServer().serve(7000);
}
2
3
启动后我们使用网络调试工具,首先将编码设置为UTF-8

然后连接我们上文编码所设置的7000端口。可以看到我们的客户端收到了服务端的响应。

# NIO使用案例
我们希望用NIO也实现上面所要做的事情,所以我们不妨也是用一下原生API实现一下NIO,可以看到做同样的事情代码完全不一样,使用原生JDK实现NIO时selector.select()等待事件通知,然后获取对应的selectorKey然后根据用户的请求将其注册到选择器上,非常繁琐。
public void serve(int port) throws IOException {
//打开ServerSocketChannel,设置为非阻塞
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
//将服务器绑定到选定的地址和端口
ServerSocket serverSocket = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
serverSocket.bind(address);
//打开Selector来处理 Channel
Selector selector = Selector.open();
//将ServerSocketChannel注册到Selector以接受连接
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
for (;;){
try {
//等待需要处理的新事件;阻塞将一直持续到下一个传入事件
selector.select();
} catch (IOException ex) {
ex.printStackTrace();
//handle exception
break;
}
//获取所有接收事件的SelectionKey实例
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
//遍历接受的接收事件的SelectionKey实例
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
//检查事件是否是一个新的已经就绪可以被接受的连接
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
//接受客户端,并将它注册到选择器
client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());
System.out.println("Accepted connection from " + client);
}
//检查套接字是否已经准备好写数据
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
while (buffer.hasRemaining()) {
//将数据写到已连接的客户端
if (client.write(buffer) == 0) {
break;
}
}
//关闭连接
client.close();
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
// ignore on close
}
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
同样的我们也给出测试代码,感兴趣的读者可以建立客户端或者使用网络调试助手和NIOServer建立连接了解一下NIO处理过程,其实无非就是通过选择器阻塞等待事件通知,如果是建立连接请求我们则判断这个请求是写(selectionKey.OP_WRITE)还是读(SelectionKey.OP_READ),然后进行将其注册到选择器上,通过下一次循环将其处理后再移除,最后关闭连接。
public static void main(String[] args) throws IOException {
new PlainNioServer().serve(7000);
}
2
3
# 使用Netty实现OIO和NIO
# 使用Netty实现OIO
了解原生的API实现之后,我们不妨看看使用Netty版本的实现,代码如下,逻辑非常清晰:
- 创建EventLoopGroup
- 创建EventLoopGroup绑定EventLoopGroup
- 为EventLoopGroup设置管道,当有新连接建立时为这个管道创建ChannelHandler,并为ChannelHandler绑定对应拦截和处理事件。
- 最后使用异步的方式启动连接和等待关闭。
public void server(int port)
throws Exception {
final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi This is NettyOioServer!\r\n", Charset.forName("UTF-8")));
EventLoopGroup group = new OioEventLoopGroup();
try {
//创建 ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(group)
//使用 OioEventLoopGroup以允许阻塞模式(旧的I/O)
.channel(OioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
//指定 ChannelInitializer,对于每个已接受的连接都调用它
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
//添加一个 ChannelInboundHandlerAdapter以拦截和处理事件
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buf.duplicate()).addListener(
//将消息写到客户端,并添加 ChannelFutureListener,
//以便消息一被写完就关闭连接
ChannelFutureListener.CLOSE);
}
});
}
});
//绑定服务器以接受连接
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
//释放所有的资源
group.shutdownGracefully().sync();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
调试代码如下,读者可以通过网络调试助手尝试连接7000端口即可或者服务端的回复。
public static void main(String[] args) throws Exception {
new NettyOioServer().server(7000);
}
2
3
如下图所示,这就是NettyOiO的响应。

这里我们不妨详细的阐述一下它的工作过程:
- 我们创建ServerBootstrap时为其绑定了一个EventLoopGroup。
- 然后我们为EventLoopGroup添加我们所需的OioServerSocketChannel,并指定地址和端口,一旦收到用户请求就会交由childHandler中指定的channelHandler处理。
- 我们的channelHandler指定了连接建立后的回调方法channelActive,该方法会发送给用户一个欢迎的消息,如上文输出结果所示,这就是NettyOiO的整体工作过程。
# 使用Netty实现NIO
同理我们也用Netty编写一个NIO的代码,可以看到代码的模板和上文OIO差不多,只不过变换了几个参数而已,例如我们将group设置为NioEventLoopGroup ,Channel设置为NioServerSocketChannel,其实重复操作的逻辑可以说时基本没有变化,相比于原生的JDK的API,Netty对于逻辑的抽象和复用可以说是比较完美了。
public void server(int port) throws Exception {
final ByteBuf buf =
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n",
Charset.forName("UTF-8")));
//为非阻塞模式使用NioEventLoopGroup
NioEventLoopGroup group = new NioEventLoopGroup();
try {
//创建ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(group).channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
//指定 ChannelInitializer,对于每个已接受的连接都调用它
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
//添加 ChannelInboundHandlerAdapter以接收和处理事件
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(
//将消息写到客户端,并添加ChannelFutureListener,
//以便消息一被写完就关闭连接
ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buf.duplicate())
.addListener(
ChannelFutureListener.CLOSE);
}
});
}
}
);
//绑定服务器以接受连接
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
//释放所有的资源
group.shutdownGracefully().sync();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# Netty支持的几种比较常用到的传输
# 非阻塞传输NIO
NIO提供全异步的IO操作,实际上它是通过jdk1.4的nio包的selector实现,selector即我们常说的选择器,选择器我们完全可以将其看作是一个注册表,当注册的channel发生变化时,选择器就会得到通知 从上文原生jdk的示例中,我们也不难看出,选择器轮询时获取到的事件大抵有以下几种:
- OP_ACCEPT:新的channel已被接收并且就绪。
- OP_CONNECT:请求建立一个连接获得通知。
- OP_READ:请求数据已经就绪,可以从channel中处理该数据。
- OP_WRITE:当请求时可以向底层缓冲区写入数据时,选择器就会收到该类型通知。
将上面的几个点串联起来之后,我们就知道nio的工作模式:
- 不断有新的channel注册到选择器上。
- 通过选择器监听变化的channel。
- 如果感知到变化则处理变化,如果没有发生状态改变,则选择器的线程会处理别的任务。
- 收到变化或者其他任务执行结束后继续将执行权交由选择器继续阻塞监听channel的变化。

了解过NIO的朋友都知道,NIO还有一个重要的特性就是零拷贝,零拷贝使得网络IO数据传输时,数据无需经历用户空间到内核空间拷贝的步骤,这使得网络传输效率一下子提高不少,但是零拷贝也是有局限性的,它仅仅支持某些操作系统,而且对于加密过的数据,它也是无法完整传输的。
# Linux本地非阻塞传输Epoll
Linux作为高性能网络编程平台,NIO为其提供了一种更加符合其特性的网络IO模型——epoll,一种高度可扩展的IO事件通知特性,该模型的存在使得Linux系统下的NIO实现一种更轻量级的中断。
我们都知道nio会在select会阻塞住等待通知的事件。其工作原理是程序将需要获取连接状态的文件描述符fd组成一个fds数组,然后拷贝到内核态,让内核去轮询查看哪个连接去完成,在此期间,程序是阻塞的,这也是为什么我们使用原生的nio开发调试时会发现 selector.select();在没有事件通知时会阻塞。

相比这种每次获取已完成的连接都需要进行用户空间拷贝到内核空间的模型,epoll模型则比之更加轻量级。可以看到当用户调用select时,用户无需将需要获取结果的文件描述符拷贝给系统内核,因为系统内核自己已经维护了一个监听连接的数组空间,所以当内核收到请求时,内核直接自己去询问连接状态并将结果返回给用户态线程即可。

所以,如果你的服务需要运行在Linux上,还是建议使用epoll模型的NIO,实现方式也很简单,将NioEventLoopGroup改为EpollEventLoopGroup,以及将NioServerSocketChannel改为EpollServerSocketChannel即可。代码如下所示:
public class NettyNioServer {
public void server(int port) throws Exception {
final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
//为非阻塞模式使用NioEventLoopGroup
// NioEventLoopGroup group = new NioEventLoopGroup();
EpollEventLoopGroup group = new EpollEventLoopGroup();
try {
//创建ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(group).channel(EpollServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
//指定 ChannelInitializer,对于每个已接受的连接都调用它
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
//添加 ChannelInboundHandlerAdapter以接收和处理事件
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(
//将消息写到客户端,并添加ChannelFutureListener,
//以便消息一被写完就关闭连接
ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buf.duplicate())
.addListener(
ChannelFutureListener.CLOSE);
}
});
}
}
);
//绑定服务器以接受连接
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
//释放所有的资源
group.shutdownGracefully().sync();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 小结
上文我们介绍Netty中比较常见网络传输,对于需要处理高并发连接的服务器,我们十分推荐使用NIO,尤其是Linux环境,我更加推荐使用更加轻量级中断的epoll相关的API。