漫谈I/O多路复用与其在SS中的应用

Java基础

浏览数:268

2019-8-21

I/O多路复用概述

我们已经知道,Unix-like系统中一共有5种I/O模型:

  • 阻塞I/O(Blocking I/O);
  • 非阻塞I/O(Non-blocking I/O);
  • I/O多路复用(I/O Multiplexing);
  • 信号驱动I/O(Signal-driven I/O);
  • 异步I/O(Asynchronous I/O)。

本文只讲I/O多路复用,不会展开说其他每种I/O模型的细节。看官如果想全面了解,可以参考W. Richard Stevens等人所著的《UNIX网络编程卷1:套接字联网API(第3版)》。

我们先对I/O多路复用下一个自己的定义:

所谓I/O多路复用,就是指单个线程可以感知到多个I/O流的状态。当I/O流就绪时,触发执行相应的操作。

也就是说,“多路”的是I/O流,“复用”的是线程。由于在Linux系统中一切皆文件,因此“I/O流”这个词就能理解为文件描述符(file descriptor, fd),它可以代表真正的文件,也可以代表磁盘、设备、Socket等等。I/O多路复用的根本目的,是使得应用能够处理更多的并发,提高服务器的吞吐量。根据应用场景的不同,I/O多路复用有时也被称作事件驱动模型(Event-driven model),比如Redis里基于I/O多路复用自行实现的事件驱动库ae

下图是《Unix网络编程》书中给出的I/O多路复用流程图,以select()和UDP的recvfrom()系统调用为例。

在该示例中,客户端程序会首先调用I/O多路复用的系统调用select(),如果所有Socket里都没有数据报,该调用就会阻塞。一旦某个Socket有数据报准备好,select()就会返回可读,然后就调用recvfrom()将数据报中的数据从内核空间复制到用户空间。

下图示出5种I/O模型的对比。I/O多路复用本质上仍然是一种同步操作,但是与阻塞I/O相比,效率更高,不必一直“干等着”。而与非阻塞I/O相比,CPU也不必持续地检查流是否就绪,节省了很多CPU时间。

I/O多路复用的实现

在Linux系统中,存在有3种典型的I/O多路复用实现,即select、poll和epoll,它们的出现是有先后的。下面分别来看个大概,之后有时间的话,再根据内核源码来分析它们。

select

select方式早在上世纪80年代就已经出现了,在上一节图中出现过的select()系统调用的签名如下所示。

int select(
    int nfds, 
    fd_set *restrict readfds, 
    fd_set *restrict writefds, 
    fd_set *restrict errorfds,
    struct timeval *restrict timeout);

其中,readfds、writefds、errorfds是三个fd_set,即文件描述符的集合,分别代表读取、写入和异常的I/O流集合。nfds则表示检查[0, nfds – 1]这个范围内的fd,所以其值不应该是df的总个数,而是最大的fd值+1。timeout表示阻塞超时,为0代表立即返回,为NULL则代表一直阻塞直到有fd就绪。

select()系统调用的大致执行流程是:

  1. 将各个fd_set从用户空间复制到内核空间;
  2. 遍历[0, nfds – 1]范围内的每个fd,调用fd的poll()函数,检查其对应设备中是否有可用的流;
  3. 如果有流就绪,根据类型,将其加入对应的fd_set。否则就按照timeout设定阻塞当前线程,直到有流就绪或等待超时;
  4. select()调用返回可用的fd个数,并将各个fd_set从内核空间复制回用户空间。

select方式的实现比较简单,并且跨平台性非常好。当然其缺点也比较明显:

  • fd的最大值太小,一般为1024,由FD_SETSIZE宏来指定。
  • 每次调用都需要线性轮询每个描述符,高并发情况下开销比较大。
  • fd_set从用户空间到内核空间来回复制,没有必要。
poll

poll()系统调用的签名如下所示。

int poll(struct pollfd fds[], nfds_t nfds, int timeout);

实际上,poll方式与select方式的实现几乎是相同的,不过它用pollfd结构替代了上面的fd_set结构而已。另外,它改用链表实现fd的存储,因此消除了select方式中fd的最大值限制,但其他方面没有明显的改善。

epoll

epoll直到Linux 2.6版本的内核才出现,它是对select/poll真正意义上的改进。它提供了3个系统调用。

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

epoll_create()函数的作用是创建一个epoll句柄(可以理解为一个epoll专用的fd),size参数指定需要检查多少个fd。

epoll_ctl()函数则是向句柄epfd注册、删除或修改需要监听的fd及事件event。epoll用epoll_event结构来描述事件类型与数据,在内核缓存中用红黑树保存,epfd作为根。另外,还会给中断处理程序注册一个回调,使得内核在句柄的中断到来时,标记fd为就绪。

epoll_wait()函数等待注册在句柄epfd上的事件发生,发生之后,就将事件和已就绪的fd放入event数组中,或者阻塞直到timeout超时。epoll使用链表来维护已经就绪的事件和fd,因此链表中有无数据就可以代表有无事件发生。

epoll相对于select的优点如下:

  • 没有fd数的限制;
  • 不采用轮询的方式检测fd是否可用,而是在事件触发后采用类似callback的机制通知,由O(n)变为O(1);
  • 利用mmap(在关于零拷贝的文章中讲过)映射内存空间,减少复制和修改的开销。

epoll的事件有两种触发方式,即水平触发(Level triggering, LT)和边缘触发(Edge triggering, ET)。这是源自电子学的术语,下面两个图分别示出高电位触发和上升沿触发。写到这里,还是要感谢一下我邮啊。


epoll中的水平触发是默认工作方式。当内核通知一个fd已经就绪时,程序就可以进行I/O操作了。但是如果本次不处理该fd,当下一次调用epoll_wait()时,内核仍然会再次通知,直到该fd被处理为止。水平触发也是select和poll采用的工作方式。

边缘触发则是epoll特有的工作方式,当fd在事件的发生的当时从未就绪变为已就绪的状态,内核会通知该fd的状态变化,并假定程序已经感知到了这种变化。如果未处理该fd,在下一次调用epoll_wait()时,内核也不会再通知了。

由此可见,水平触发与边缘触发各有各的好处。水平触发能够保证数据的完整性,但是仍然存在内核空间到用户空间的拷贝。边缘触发由于只需通知一次,大大减少了内核的资源占用,但同时也不再保证数据完整,需要程序做额外的处理。

SS中的I/O多路复用

SS到底是个什么东西?我也不知道呢。
其实,I/O多路复用在其他组件(比如Java NIO、Redis、Nginx)中有更广泛也更典型的应用,但是六一儿童节那天,笔者的五六架梯子一夜之间全倒,折腾得焦头烂额,于是就拿SS源码来做个纪念吧。

下图示出SS的基本工作原理,通俗易懂,我就不多讲了。

早期的SS Server在处理SS Local端发来的加密的请求时,采用的是多线程+阻塞I/O方式,每建立一个连接就新起一个线程来处理,在并发量大的情况下,过多的线程会造成性能问题。在目前较新版本的SS中,已经采用Python提供的I/O多路复用库select来解决,它就是Linux系统中I/O多路复用的统一封装。

下面来看看原版SS 2.8.2版本中EventLoop类的源码。顾名思义,它也是事件驱动的。

class EventLoop(object):
    def __init__(self):
        if hasattr(select, 'epoll'):
            self._impl = select.epoll()
            model = 'epoll'
        elif hasattr(select, 'kqueue'):
            self._impl = KqueueLoop()
            model = 'kqueue'
        elif hasattr(select, 'select'):
            self._impl = SelectLoop()
            model = 'select'
        else:
            raise Exception('can not find any available functions in select '
                            'package')
        self._fdmap = {}  # (f, handler)
        self._last_time = time.time()
        self._periodic_callbacks = []
        self._stopping = False
        logging.debug('using event model: %s', model)

    def poll(self, timeout=None):
        events = self._impl.poll(timeout)
        return [(self._fdmap[fd][0], fd, event) for fd, event in events]

    def add(self, f, mode, handler):
        fd = f.fileno()
        self._fdmap[fd] = (f, handler)
        self._impl.register(fd, mode)

    def remove(self, f):
        fd = f.fileno()
        del self._fdmap[fd]
        self._impl.unregister(fd)

    def add_periodic(self, callback):
        self._periodic_callbacks.append(callback)

    def remove_periodic(self, callback):
        self._periodic_callbacks.remove(callback)

    def modify(self, f, mode):
        fd = f.fileno()
        self._impl.modify(fd, mode)

    def stop(self):
        self._stopping = True

    def run(self):
        events = []
        while not self._stopping:
            asap = False
            try:
                events = self.poll(TIMEOUT_PRECISION)
            except (OSError, IOError) as e:
                if errno_from_exception(e) in (errno.EPIPE, errno.EINTR):
                    asap = True
                    logging.debug('poll:%s', e)
                else:
                    logging.error('poll:%s', e)
                    import traceback
                    traceback.print_exc()
                    continue

            for sock, fd, event in events:
                handler = self._fdmap.get(fd, None)
                if handler is not None:
                    handler = handler[1]
                    try:
                        handler.handle_event(sock, fd, event)
                    except (OSError, IOError) as e:
                        shell.print_exception(e)
            now = time.time()
            if asap or now - self._last_time >= TIMEOUT_PRECISION:
                for callback in self._periodic_callbacks:
                    callback()
                self._last_time = now

    def __del__(self):
        self._impl.close()

从构造方法可知,SS支持select库提供的3种I/O多路复用实现:select、epoll和kqueue(相当于epoll在BSD中的实现)。其中select和kqueue都单独封装了对应的实现类,epoll没有,我们就以epoll为例来简单看看。

EventLoop类中采用一个字典_fdmap保存fd与其事件处理逻辑的映射,即[fd, (f, handler)]。其add()、remove()、modify()方法则是代理了epoll.register()/unregister()/modify()方法,对应的系统调用是epoll_ctl()。poll()方法代理了epoll.poll()方法,对应的系统调用是epoll_wait(),可以设定超时。

除了正常的事件处理逻辑之外,它也支持周期性的回调逻辑,保存在_periodic_callbacks结构中,用add_periodic()和remove_periodic()方法可以添加或删除。

EventLoop的核心方法就是run()方法,它是一个无限循环,执行以下操作:

  1. 调用poll()方法等待事件的触发。
  2. 如果发生EPIPE异常,说明fd不可操作,连接可能已经关闭;如果发生EINTR异常,说明系统调用被信号中断。这两种情况都需要将asap标志位设为True,立即回调。
  3. poll()方法返回,从其返回的就绪列表中,取得fd对应的处理逻辑handler,并调用handle_event()方法进行处理。
  4. 如果设置了asap标志位,或者回调周期已到,就调用_periodic_callbacks中的各个回调函数。

在SS Server的源码server.py中,初始化EventLoop的方法如下。

    def run_server():
        # 略去...
        try:
            loop = eventloop.EventLoop()
            dns_resolver.add_to_loop(loop)
            list(map(lambda s: s.add_to_loop(loop), tcp_servers + udp_servers))

            daemon.set_user(config.get('user', None))
            loop.run()
        except Exception as e:
            shell.print_exception(e)
            sys.exit(1)

可见,在创建了EventLoop对象之后,会将早先创建好的异步DNS解析器dns_resolver、TCP转发tcp_servers、UDP转发udp_servers绑定到EventLoop,这三个组件都围绕EventLoop实现了事件驱动逻辑。来看看TCPRelay.add_to_loop()方法。

    def add_to_loop(self, loop):
        if self._eventloop:
            raise Exception('already add to loop')
        if self._closed:
            raise Exception('already closed')
        self._eventloop = loop
        self._eventloop.add(self._server_socket,
                            eventloop.POLL_IN | eventloop.POLL_ERR, self)
        self._eventloop.add_periodic(self.handle_periodic)

其中,POLL_IN对应EPOLLIN宏,表示fd上有数据可读的事件,POLL_ERR则对应EPOLLERR宏,表示fd上有错误发生。同理,POLL_OUT则表示fd上有数据可写。

事件处理器也由分别的类来实现,例如TCPRelayHandler.handleEvent()方法。

    def handle_event(self, sock, event):
        if self._stage == STAGE_DESTROYED:
            logging.debug('ignore handle_event: destroyed')
            return
        if sock == self._remote_sock:
            if event & eventloop.POLL_ERR:
                self._on_remote_error()
                if self._stage == STAGE_DESTROYED:
                    return
            if event & (eventloop.POLL_IN | eventloop.POLL_HUP):
                self._on_remote_read()
                if self._stage == STAGE_DESTROYED:
                    return
            if event & eventloop.POLL_OUT:
                self._on_remote_write()
        elif sock == self._local_sock:
            if event & eventloop.POLL_ERR:
                self._on_local_error()
                if self._stage == STAGE_DESTROYED:
                    return
            if event & (eventloop.POLL_IN | eventloop.POLL_HUP):
                self._on_local_read()
                if self._stage == STAGE_DESTROYED:
                    return
            if event & eventloop.POLL_OUT:
                self._on_local_write()
        else:
            logging.warn('unknown socket')

可见,该方法先检查传入的sock是远程sock还是本地sock,然后通过位运算检查对应的epoll事件类型,进而调用远程或本地的读写或异常处理方法。

最后,调用EventLoop.run()方法启动之,SS Server就可以开始处理SS Local发来的请求了。

作者:LittleMagic