Netty服务端接收的新连接是如何绑定到worker线程池的?

C#

浏览数:268

2020-6-13

 

更多技术分享可关注我

前言

原文:Netty服务端接收的新连接是如何绑定到worker线程池的?

前面分析Netty服务端检测新连接的过程提到了NioServerSocketChannel读完新连接后会循环调用服务端Channel绑定的pipeline.fireChannelRead()方法,将每条新连接打包当做参数传入,然后通过这个方法将其沿着服务端Channel的pipeline传递下去,即在Channel的handler链条上流动,这部分细节后续会详细分解。 

下面看下,新连接在服务端Channel的pipeline的流动过程中,Netty配置的boss线程池和worker线程池是如何配合的。

服务器的新连接接入器源码分析

简单回顾前面文章:Netty是如何处理新连接接入事件的?中分析了Netty服务端检测新连接的过程,回忆NioMessageUnsafe类的read()方法源码:

看最后的红色方框,是在循环中将新连接顺着Channel的pipeline传递下去,NioMessageUnsafe是前面说的Netty的Channel的内部接口——Unsafe的服务端的实现类。

那么这些新连接后续被传递时会发生什么呢?这也是重点问题——即Netty客户端新连接的Channel被封装后,如何与Netty的I/O线程关联。下面看之前提到的新连接接入器,关联的功能主要是这个接入器实现。

言归正传看ServerBootstrapAcceptor源码,它是一个内部类,继承了ChannelInboundHandlerAdapter(后面详解Netty的pipeline机制)。

现在先复习一下服务端启动流程。服务端启动的核心操作是绑定端口,即在用户代码中serverBootstrap.bind(xx);方法中启动,里面会调用ServerBootstrap的doBind方法,在doBind方法里调用了ServerBootstrap的initAndRegister()方法,这是一个初始化服务端Channel并注册I/O多路复用器的方法,如下图:

该方法通过反射创建了服务端的NioServerSocketChannel,并且创建保存了JDK的ServerSocketChannel以及一些组件,比如pipeline等,接着执行Channel的初始化操作——即ServerBootstrap的init(channel)方法(分析的是服务端代码,故只看ServerBootstrap类对init的实现),init方法里就有新连接接入器的创建逻辑。如下红框处,在init里配置服务端的pipeline时,默认添加了一个ServerBootstrapAcceptor handler:

先捋一捋完整过程:

1、首先ServerBootstrap的init方法为服务端Channel的pipeline添加了一个ChannelInitializer,在该类实现的void initChannel(Channel ch)方法里先将用户代码里配置的服务端的handler添加,前面我也说过,这个服务端的handler配置一般很少用到(即.handler() API),常用的主要是给客户端配置handler,即.childHandler()

2、然后异步的添加一个新连接接入器——ServerBootstrapAcceptor,具体的,是把添加ServerBootstrapAcceptor到pipeline的操作封装为了一个task,委托给服务端的NIO线程异步执行,等到有新连接到来时,该task已执行完毕。即Netty服务端Channel的pipeline最小结构如下:

这里提前接触Netty的入站事件和出站事件的概念,所谓入站事件——即inbound事件,即Netty的NIO线程主动发起的,是面向用户业务handler的操作,即都是被动发起的事件,通过fireXXX方法传播。

比如Channel连接成功,Channel关闭,Channel有数据可读,Channel上注册I/O多路复用器成功,Channel解除I/O多路复用器的注册,异常抛出等,这些都是被动执行的回调事件,它们的处理有专门的handler实现,统一叫入站handler。反之还有出站事件和出站handler,出站事件——即outbound事件,都是用户线程或者用户代码主动发起的事件,如下是出站事件:

比如服务器主动绑定端口,主动关闭连接,客户端主动连接服务器,服务器(客户端)主动写出消息等操作,这些事件的特点就是由用户主动发起。针对这两类事件,除了Netty默认提供的handler,用户还可以自定义入站/出站handler以实现自己的拦截逻辑,这也是职责链(也叫责任链)模式的思想。

言归正传,继续分析服务器读取新连接的过程,现在分析的是新连接接入,故只看入站handler。先知道入站事件流动的顺序是从pipeline的头部节点开始,途径各个入站handler节点,一直流动到尾部节点结束,这里就是Head->ServerBootstrapAcceptor->Tail。如下:

还得知道tail节点本质是一个入站handler,head节点本质是一个出站handler,后续会详细拆解,这里不知道为什么也无所谓。

前面说到,NioMessageUnsafe类的read()方法,最后会将读到的客户端新连接传递出去,如下:

具体来说是触发后续的各个入站handler的ChannelRead事件(前面说了ChannelRead是一个入站事件),入站事件都是从pipeline的头部节点——HeadContext开始传播的,而触发这个事件传播的正是pipeline.fireChannelRead(xxx)方法。

还记得服务端启动的时候,如下有一段代码:serverBootstrap.handler(new ServerHandler())serverBootstrap.childHandler(new ServerHandler());

当时给了这样一个结论:.handler方法添加的handler是添加到服务端Channel的pipeline上,是在服务端初始化的时候就添加的,而.childHandler方法添加的handler是添加到客户端Channel的pipeline上,是在处理新连接接入的时候添加的。现在知道原因了,ServerBootstrap调用init时,先pipeline.addLast(handler),然后添加一个ServerBootstrapAccepter,这样服务端的pipeline也可能是head-hander>serverBootStrapAccepter>tail这种组成结构,如下(很熟悉的结构):

这里一定要明白,两个操作是分别把handler加到了服务端和客户端的pipeline。

serverBootStrapAccepter本身也是一个入站的handler。根据前面的分析,入站事件的传播顺序是head->用户定义的入站handler->ServerBootstrapAcceptor->tail,我的demo里没有为服务器定义handler,故直接调用到ServerBootstrapAcceptor的channelRead方法,该方法是接入器的重点,需要重点学习,ServerBootstrapAcceptor的channelRead方法源码如下;

ServerBootstrapAcceptor是ServerBootstrap的一个内部类。下面看debug过程,一上来就把msg强转为了Channel,即这里接收到的msg变量本质是刚刚读取到的客户端新连接——被Netty封装为了其自定义的Channel。后续的ServerBootstrapAcceptor主要做了三件事:

1、黄色1处,就是前面分析的,在接入器里添加用户配置的客户端Channel的handler:即将用户在服务器代码里通过.childHandler()自定义的ChannelHandler添加到客户端的pipeline,后续详解。

2、黄色2处,设置用户配置的options和attrs,主要是设置客户端Channel的childOptions和childAttrs,childOptions是channel底层为TCP协议配置的属性,childAttrs是channel本身的一些属性,它的本质是个map,比如可以存储当前channel存活时间,密钥等。

3、黄色3处,选择worker线程池的一根NIO线程,并将其绑定到该客户端Channel——即代码里的child变量。这步是异步操作,并通过register方法实现,这个方法复用了服务端启动时为服务端Channel注册I/O多路复用器的代码逻辑。这最后一步又分为两小步:

  • worker线程池通过EventLoop的线程选择器——Chooser的next()方法选择一个NioEventLoop线程和新连接绑定,和服务端线程池一样的逻辑

  • 注册客户端的新Channel到这个NioEventLoop的I/O多路复用器,并为其注册OP_READ事件

下面详细分析这两小步,我通过debug跟进register,来到了MultithreadEventLoopGroup的register方法,如下源码:

最后进入到父类io/netty/util/concurrent/MultithreadEventExecutorGroup类,看到这里就很熟悉了,会进入到前面分析过的NioEventLoopGroup的线程选择器。

这里使用的优化方法——通过位运算选择一个NioEventLoop线程。如下发现idx是0,即workerGroup线程池里的线程此时才刚刚选择第一个,因为这是我当前运行的服务器接收到的第一条客户端连接,所以后续再来新连接时,会顺次启动后续的线程与之绑定,如果绑定到最后一根,那么idx会重新从0开始,循环往复。。。注意此时NIO线程还没有启动。Netty做了优化,前面也说了,Netty的线程池都是延迟启动的。

在MultithreadEventLoopGroup类的register方法里选择NioEventLoop线程后,next()方法会返回一个NioEventLoop实例,然后继续调用该实例的register方法,即下一步过会跳转到NioEventLoop直接父类SingleThreadEventLoop的register方法,如下源码:

调用到了第二个register方法里,里面的channel()方法返回的就是客户端的NioSocketChannel,unsafe()方法就是NioByteUnsafe实例,即最后调用了客户端channel的Unsafe的register方法。即AbstractChannel的内部类——AbstractUnsafe的register方法,源码如下:

看到这个方法的代码就应该很熟悉了,我在前面Netty服务端启动的时候分析过,即给客户端新连接注册I/O多路复用器的逻辑复用了这一套代码,这也得益于Netty良好的架构设计。

下面再分析一下,执行AbstractUnsafe的register方法的逻辑:

1、首先对当前客户端的I/O线程以及Channel做校验,然后在黄色1处,判断当前线程是不是NIO线程,显然这里是false,因为虽然此时已经选择了一个客户端NIO线程,但是该NIO线程还没有启动,整个注册逻辑还是运行在用户线程下,我的demo是main线程,如下佐证,故1这里判断失败,接下来执行else里的代码,将真正的注册逻辑委托给刚刚启动的客户端的NIO线程异步执行,这样做也能保证线程安全。

2、看黄色2处,即else代码里,会通过NioEventLoop的execute方法启动之前选择的NIO线程(当然,如果已经启动了,那么会略过启动步骤),同时驱动注册的这个task,这里才真正启动NIO线程,也能佐证Netty的线程池实现了延迟启动,

3、最后看黄色3处,我进入到这个register0方法,看它的实现源码,如下:

最关键的方法是其中的doRegister()方法,看红色方框处。我进入该方法,发现其实现在了子类AbstractNioChannel里。这就非常熟悉了,还是和服务端注册ServerSocketChannel流程一样,如下:

正是Netty封装的JDK注册Channel的Selector的逻辑。在该方法里将客户端Channel注册到客户端NioEventLoop线程的I/O多路复用器,并将NioSocketChannel对象附加到JDK Channel,不过此时注册的感兴趣的I/O事件还是0,即什么都不关注,即该客户端Channel还处于初始化状态,真正注册I/O事件还在后面流程里。

注意该方法将注册逻辑写在了一个死循环里,学会这种用法,目的是为了保证一个事情必须完成,即使出现某些异常。

回到register0方法,再看一遍,注册完成后,会先触发处于挂起状态的handlerAdded事件,即先执行黄色1处的代码,这里对应了为该客户端新连接添加用户自定义的客户端handler的逻辑。然后才执行黄色2处,触发并传播当前Channel已经注册成功的事件。如果当前Channel依然存活,那么会继续执行3处的代码,即为首次注册的新Channel传播Channel成功连接(处于活跃状态)的事件。

最后,如果当前Channel不是第一次注册,那么会判断是否配置的自动读消息(Netty默认都是读优先),如果是,那么会执行黄色4处的代码,后续详解。

为新连接分配NIO线程和对新连接注册I/O多路复用器的核心——是理解ServerBootstrapAcceptor,并由此知道服务端Channel的pipeline最小构成:Head->ServerBootstrapAcceptor->Tail

理解ServerBootstrapAcceptor:

1.延迟添加childHandler——将自定义ChannelHandler添加到新连接的pipeline,必须等当前Channel注册I/O多路复用器完毕后,才会添加

2.设置options和attrs——设置childOptions和childAttrs

3.选择NioEventLoop并注册到Selector,核心是调用worker线程池的Chooser的next()方法选择一个NioEventLoop,通过其doRegister()方法,将新连接注册到worker线程绑定的Selector上。这里的新连接和Selector是多对一的关系。

欢迎关注

dashuai的博客是终身学习践行者,大厂程序员,且专注于工作经验、学习笔记的分享和日常吐槽,包括但不限于互联网行业,附带分享一些PDF电子书,资料,帮忙内推,欢迎拍砖!

作者:dashuai的博客