nio的好伙伴——netty

Java基础

浏览数:88

2020-6-13

NIO的那些事

我们在前段时间学习了IONIO的一些概念性的东西,并且写了一些简单的例子进行实践,虽然简单,但基本上覆盖了NIO的一些最基本的概念了。
如果还没看过的,如果翻一下之前的文章了解一下,或者看一下网上的其他文章。

JAVANIO的那些痛

既然我们学过NIO,那我们以JAVANIO来举个例子,说明一下我们使用NIO的一些基本流程:

  1. 打开ServerSocketChannel(Server端)或SocketChannel(Client端),监听对应的端口或连接对应的端口
  2. 设置configureBlocking(false)为非阻塞
  3. 通过register注册要监听的描述符
  4. 通过Selector.open打开Selector
  5. 调用Selector.select得到已就绪的SelectionKey
  6. 遍历SelectionKey进行相应的处理

这里我把之前的某些步骤合并了,可能跟之前有前面的文章有点不一致,但总体步骤是一样的。

其实,上面的步骤我们大可以了解到,我们真正需要关注的步骤只是第6步,或者说是我们真正要处理IO事件的一些逻辑,其他的都是一些通用流程而已。

既然如此,我们真的有必要把时间花费在这些通用的地方吗?

偷懒的程序员肯定不想这样做,所以有人开发了minanetty一类的NIO框架,旨在把程序员从这些烦杂的通用流程中释放出来,而是只关注真正的业务逻辑,把这些交由框架去做处理。

minanetty的作者都是同一个人(Trustin Lee,牛人总是各种牛)。

但鉴于netty基本上已经是事实上的NIO标准框架了,并且社区一直比较活跃,而mina已经归档很久了,都已经没更新很多年了。为了避免精力太过分散(其实是我没学习过mina,不懂-_- ),我们这里不讨论mina,直接学习netty,里面有很多值得我们学习的东西。

前置知识

线程模型

在开始介绍netty相关的知识前,我们来了解一下线程模型相关的一些知识,这里参考了很多网上的一些文章,加以自己整理了一下,希望能够给一些看其他文章不清楚的朋友一些不一样的理解。

单线程模型

图片来自:
https://www.jianshu.com/p/738…

这里的单线程指的是分派线程和工作线程都在同一个线程,可以看回我们的JAVANIO示例代码,这里为了方便,我们也贴在下面:

public class MyServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress("localhost", 8001));

        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        String str = "";
        while(!Thread.currentThread().isInterrupted()) {
            //这里是一直阻塞,直到有描述符就绪
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while(keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                //连接建立
                if (key.isAcceptable()) {
                    try {
                        SocketChannel clientChannel = serverSocketChannel.accept();
                        clientChannel.configureBlocking(false);
                        clientChannel.register(selector, SelectionKey.OP_READ);
                    } catch (ClosedChannelException e) {
                        e.printStackTrace();
                    }
                }
                //连接可读,这时可以直接读
                else if (key.isReadable()) {
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);

                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    try {
                        int num = socketChannel.read(readBuffer);
                        str = new String(readBuffer.array(), 0, num);
                        System.out.println("received message:" + str);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

}

我们可以看到在我们nio的例子中,我们没有明确使用多线程,这里就是使用了单线程来处理的。
它有什么好处呢?

实现简单。这是当然的,所有不涉及到多线程的代码都是相对比较简单的,注意,是
相对

有优点的同时肯定有缺点,那么这种单线程有什么缺点呢:

性能相对比较差。只有一个线程进行请求的处理,也就是只有一个线程处理
CPU的描述符,假设同一时间有很多信号都就绪了,并且我们读到
IO数据后的真正处理逻辑可能比较复杂,那么所有的请求都需要等待当前的请求处理完成后才能处理其他的。这也就导致了它的性能相对(这里的相对是对比其他多线程的处理方式)比较弱。

多线程模型

图片来自:
https://www.jianshu.com/p/738…

这里的多线程指的是处理逻辑的多线程,对应到我们的NIO代码逻辑里面就是对SelectionKey的处理是多线程的,我们直接看代码会直观点:

public class MyServerMultipleThread {

    @SuppressWarnings("Duplicates")
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress("localhost", 8001));

        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while(!Thread.currentThread().isInterrupted()) {
            //这里是一直阻塞,直到有描述符就绪
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while(keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                //连接建立
                if (key.isAcceptable()) {
                    try {
                        SocketChannel clientChannel = serverSocketChannel.accept();
                        clientChannel.configureBlocking(false);
                        clientChannel.register(selector, SelectionKey.OP_READ);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                //连接可读,这时可以直接读
                else if (key.isReadable()) {
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    int num = socketChannel.read(readBuffer);
                    new Thread(() -> {
                        String str = new String(readBuffer.array(), 0, num);
                        System.out.println("received message:" + str);
                    }).start();
                }
            }
        }
    }

}

这里我们可以看到,在进行SelectionKey遍历读完数据后真正处理的时候,我们新起了一个新的线程进行NIO的相关处理。

当然,这里的只是一个示例,真正写代码的时候不应该这样无限制的新起线程,而是应该使用线程池,更合理的使用线程,避免线程数量太多,导致
CPU切换太频繁,这样反而起不到优化性能的作用。

主从多线程模型

图片来自:
https://www.jianshu.com/p/738…

一看到这图,估计很多人头都大了,这都什么鬼,这么复杂啊。
实际可以简单一点理解:

  • 原来的接收请求是单线程,现在变成了多线程(线程池)
  • 原来的处理逻辑是单线程,现在是使用多线程(线程池)

注意,这里的多线程不包括
accept请求,
accept还是由单个线程进行分发。

我们直接看一下代码会比较容易理解

public class MyServerMultipleThread2 {

    @SuppressWarnings("Duplicates")
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress("localhost", 8001));
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while(!Thread.currentThread().isInterrupted()) {
            selector.select();
            //这里是一直阻塞,直到有描述符就绪
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while(keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                //连接建立
                if (key.isAcceptable()) {
                    try {
                        SocketChannel clientChannel = serverSocketChannel.accept();
                        clientChannel.configureBlocking(false);
                        clientChannel.register(selector, SelectionKey.OP_READ);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                //连接可读,这时可以直接读
                else if (key.isReadable()) {
                    new Thread(() -> {
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        int[] num = new int[]{0};
                        try {
                            num[0] = socketChannel.read(readBuffer);
                            new Thread(() -> {
                                String str = new String(readBuffer.array(), 0, num[0]);
                                System.out.println("received message:" + str);
                            }).start();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }).start();
                    key.channel().register(key.selector(), SelectionKey.OP_WRITE);
                }
            }
        }
    }
}

这里我没有参考一些网上比较复杂的做法,可能实现起来不大一致,但相对容易理解一点。

  1. 我们在接受到accept请求时,还是由单前线程单独处理
  2. READWRITE等请求时,我们新起一个线程去做处理,并且在真正的处理逻辑时,还是跟上面的多线程逻辑一样,是新起一个线程去做处理。
  3. 我们在处理READWRITE时,注意,需要重新注册相应的WRITEREAD事件——因为新起线程后,当前SelectionKey的信号还是READ,如果我们不做修改,会导致当前的线程会重复多次处理。具体大家可以下来试试,把后面的register去掉,看一下会出现什么情况。

我们看到,上面的线程模型,都以性能提升为目的,一步步去进行优化,但同时我们也看到了,代码是越来越复杂,使得我们在维护我们真正的逻辑时,有点像是大海捞针,真正的代码逻辑就那么一点,而很多都是一些模板代码。

为了解决这些问题,就需要引出我们的框架了,框架正是为了帮我们去约定好一些通用的逻辑而出现的,比如
spring,帮我做好了
IOC
AOP等的一些逻辑,这些不需要我们去额外关注;而
mybatis帮我们做好了
ORM相关的一些处理,
DB映射等,这些流程化的东西都已经固化了;而我们这里要说的
netty,它帮我们把
NIO这些线程模型相关的东西帮我们做了很多的优化和抽取,我们不再需要管这些流程化的东西,只需要写我们自己的逻辑。

netty出场

netty作为一个高性能的NIO框架,基本上已经是事实上的NIO标准了,包括dubbozookeeper等内部都比较大量地使用了netty。或者说具体点,这些框架能够有这么好的性能,大部分功劳要归结到netty身上。

netty基础知识

看例子前我们先来补充一些基础知识。
netty有几个重要概念:

  • ChannelHandler

channel的事件处理器,里面封装了针对当前
channel的生命周期的方法

  • ChannelInBoundHandler

channel
READ请求处理器,里面封装了当前
channel的对于接收请求相关的生命周期方法

  • ChannelOutBoundHandler

channel
WRITE请求处理器,里面封装了当前
channel的对象发出请求的生命周期方法。

  • ChannelPipeline

此类是
netty架构中比较重要的一个类,它使用了
责任链模式,把请求从
ChannelHandler中一个个的往后传递,最终到达我们的业务
Handler。关于
Pipeline的详细描述,我们后面再详细看看。

  • ByteBuf

netty封装了自己的
ByteBuf,与
JDK自带的
ByteBuffer的最主要的区别是它有两个指针,一个供读
readerIndex,一个供写
writerIndex。而至于该类的一些详细信息,大家可以看一下它的
JavaDoc,写得非常详细。

关于OutBound和上面的InBound的区别,大家可以简单地区分一下,In就是请求进入,对应的就是READOut就是请求发出,对应的就是WRITE

基本的概念了解清楚了,那我们来看一下简单的例子。
其实netty最好的文档是它的官网文档。我们就还是以类似官方源码里面的一个example来学习一下,实现的功能很简单:

Client连接成功后传一句话给
Server
Server回复收到。

实战

server

ServerHandler
public class MyNettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("from client:" + msg);
        ctx.writeAndFlush("I received your message:" + msg + System.lineSeparator());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }
}

我们的代码比较简单,打印收到的文本,并且再发回一条语句。我们可以看到我们输出的时候加多了一个
换行符——
System.lineSeparator(),这是为什么呢?

这里涉及到另外一个
TCP/IP一个比较重要的问题,
拆包和粘包,这里我们先不细说,后面我会有专门的文章来说一下
拆包和粘包还有一系列
TCP/IP相关的知识,这是非常大的一块了。我们现在就先简单的知道,加这个
换行符是为了让
Handler知道我们的消息从哪里结束。

Server
public class MyNettyServer {

    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LineBasedFrameDecoder(4096));
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(new MyNettyServerHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);

        ChannelFuture channelFuture = null;
        try {
            channelFuture = serverBootstrap.bind("127.0.0.1", 8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

这里涉及到比较多的知识点,整体结构我们先不管它,我们重要先关注一下:

  • 线程池

这里定义了两个线程组进行处理,
BossGroup
WorkerGroup,对应我们上面的
多线程模型,原因是
netty并不使用
主从多线程模型——这个我们以后的文章有机会再细说。

  • ServerBootStrap

netty工具类,有助于编写服务器的相关代码,而
Client端对应的就是
Bootstrap了。

  • pipeline的添加
ch.pipeline().addLast(new LineBasedFrameDecoder(4096));                 ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new MyNettyServerHandler());

这里把4个Handler添加到Pipeline的末尾,至于为什么是末尾,相应看到后面的pipeline的解析的时候大家就会知道了。
我这里大概描述一下几个Handler的作用:

  • LineBasedFrameDecoder

根据换行符
\n
\r\n进行内容的分割——即
拆包

  • StringDecoder

把接收到的内容解析为
String字符串

  • StringEncoder

把发出的内容解析为
String字符串

  • MyNettyServerHandler

我们的真正逻辑处理类,这个应该是在前面的几个处理完成后再进行。我们在后面的
pipeline执行顺序中可以看到为什么这样添加。

后面的
Client中的
Handler也可以参考上面的。

Client

ClientHandler
public class MyNettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush("helloworld" + System.lineSeparator());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("from server:" + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }
}

这里我们的代码也比较简单,就是连接成功的时候发条helloworld过去服务端,然后再从服务端读到返回的内容。我们就不细说了。

Client
public class MyNettyClient {

    public static void main(String[] args) {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(workerGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LineBasedFrameDecoder(4096));
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(new MyNettyClientHandler());
                    }
                })
                .option(ChannelOption.SO_KEEPALIVE, true);

        try {
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

}

对比上面的Server代码,这里的区别,最大的就是我们只有一个EventLoopGroup,因为Client端并不需要接收请求,所以并不需要所谓的BossGroup

一切就绪后,我们可以跑一下看看运行情况:
先运行server,再运行client
server可以看到

client可以看到

这表示我们已经使用netty写了一个基本可以用的NIO程序了。

ChannelPipeline详解

ChannelPipeline作为netty的一个底层重要组成部分,ChannelHandler都需要依靠它进行调度,重要性不言而喻。那我们现在就一起来看看ChannelPipeline究竟是怎么调度的。
查看ChannelPipelineJavaDoc我们可以看到这样一串描述(牛人写描述都是特别认真的)。

大概的意思就是这样的:

  1. 请求进来时,按照InBoundHandler的添加顺序,从前往后执行。
  2. 请求出去时,按照OutBoundHandler的添加顺序,从后往前执行。

另外,文档中又举了一个例子:

我们套用一下我们的Server例子来分析一下:
LineBasedFrameDecoder,StringDecoder,StringEncoder,MyNettyServerHandler

当我们收到消息时,需要执行的
Handler的顺序为:
LineBasedFrameDecoder,
StringDecoder,
MyNettyServerHandler

当我们发出消息时,需要执行的
OutboundHandler的顺序为:
StringEncoder.

基于上面的分析,我们就可以分析为什么我们前面的例子可以得到那样的结果。

总结

这篇文章,我们从一开始的线程模型到后面的netty的示例,这些种种都是为了性能的提高去做的一些优化。在当前大数据的趋势下,更多需要我们把性能去做到极致。
后面,我们会再根据netty中的一些最佳实践来分析它是怎么解析粘包和拆分的。

参考文章

https://www.jianshu.com/p/738095702b75
https://netty.io/wiki/user-guide-for-4.x.html

作者:shun记忆