I/O复用

c/c++

浏览数:126

2019-6-30

  内核(操作系统)一旦发现进程指定的一个或者多个IO条件准备读或者准备写的时候,就会给该进程发一个通知。当服务端要处理多个套接字文件描述符的时候,这个时候可以采用IO复用,操作系统发现哪些套接字文件描述符可读或可写的时候,就会通知相应的进程才去执行对应的read(保证文件描述符对应的地址有可用的数据返回,而不是由于试探性的返回无用的值)或write操作。

可以举个例子:

  例如:现在李老师收取刚刚布置给学生要默写在纸上的古诗的作业。

  第一种情况:李老师按照学号的顺序来收取,并且会等待将要收取作业的同学同意提交作业,直到该同学提交作业,才会去下一个学号的同学那里去询问是否提交作业。(循环处理每个socket,不支持高并发,效率低)

  第二种情况:李老师向其他老师请求帮助,拉来了很多老师帮忙收作业,每个老师处理一小部分学生的古诗词作业的提交任务。(相当与创建多个进程或者线程处理socket)

  第三种情况:李老师站在讲台上,根据同学们的反应来做出相应的动作(如果谁的要提交作业,该同学就举手),某些同学举手后,李就会去收取这些同学的作业。(IO复用)

系统调用函数实现IO复用

  将多个文件描述符集中到一起统一监视。比如对多个套接字进行统一管理与调度 。

1.select函数

  函数会做的事情包括:

  • 检测是否存在套接字接受数据

  • 检测是否存在套接字无阻塞的传输数据

  • 哪些套接字发生了异常

该函数的调用时的顺序

1. 设置文件描述符

将需要监视的文件描述符集中到一起(fd_set),集中的时候要按照监视项来区分(包括接收,传输,异常)。fd_set结构体如下:

/* The fd_set member is required to be an array of longs.  */
typedef long int __fd_mask;
/* Number of descriptors that can fit in an `fd_set'.  */
#define __FD_SETSIZE        1024
/* It's easier to assume 8-bit bytes than to get CHAR_BIT.  */
#define __NFDBITS   (8 * (int) sizeof (__fd_mask))
/* fd_set for select and pselect.  */
typedef struct
  {
    __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
  } fd_set;

fd_set结构如下(有三种情况,监听接收作用的fd_set,监听传输作用的fd_set,监听异常作用的fd_set),当对应的区上的位置的值0被置为1,表示该位置对应的文件描述符正在被监视,或可读或可写,亦或者是有异常:

 在fd_set中注册文件描述符或者更改值的操作都是由相关宏来完成的。

/* Access macros for `fd_set'.  */
#define FD_SET(fd, fdsetp)  __FD_SET (fd, fdsetp)  
#define FD_CLR(fd, fdsetp)  __FD_CLR (fd, fdsetp)  
#define FD_ISSET(fd, fdsetp)    __FD_ISSET (fd, fdsetp)
#define FD_ZERO(fdsetp)     __FD_ZERO (fdsetp)  
​
FD_ZERO(fd_set* fdset);           //将fd_set的所有位都初始化为0 
FD_SET(int fd, fd_set* fdset);    //在fd_set中注册文件描述符fd的信息
FD_CLR(int fd, fd_set* fdset);    //从参数fd_set中清除文件描述符fd的信息
FD_ISSET(int fd, fd_set* fdset);  //若参数fd_set所指向的变量包含文件描述符fd的信息,则返回1,否则返回0/* Access macros for `fd_set'.  */
#define FD_SET(fd, fdsetp)  __FD_SET (fd, fdsetp)  
#define FD_CLR(fd, fdsetp)  __FD_CLR (fd, fdsetp)  
#define FD_ISSET(fd, fdsetp)    __FD_ISSET (fd, fdsetp)
#define FD_ZERO(fdsetp)     __FD_ZERO (fdsetp)  
​
FD_ZERO(fd_set* fdset);           //将fd_set的所有位都初始化为0 
FD_SET(int fd, fd_set* fdset);    //在fd_set中注册文件描述符fd的信息
FD_CLR(int fd, fd_set* fdset);    //从参数fd_set中清除文件描述符fd的信息
FD_ISSET(int fd, fd_set* fdset);  //若参数fd_set所指向的变量包含文件描述符fd的信息,则返回1,否则返回0

2.指定监视范围和超时

  select函数的原型如下:

/* Check the first NFDS descriptors each in READFDS (if not NULL) for read
   readiness, in WRITEFDS (if not NULL) for write readiness, and in EXCEPTFDS
   (if not NULL) for exceptional conditions.  If TIMEOUT is not NULL, time out
   after waiting the interval specified therein.  Returns the number of ready
   descriptors, or -1 for errors.
​
   This function is a cancellation point and therefore not marked with
   __THROW.  */
extern int select (int __nfds, fd_set *__restrict __readfds,
           fd_set *__restrict __writefds,
           fd_set *__restrict __exceptfds,
           struct timeval *__restrict __timeout);

第一个参数:监视对象文件描述符的数量

第二个参数:传递包含所有关注“是否存在待读取”的文件描述符的fdset。

第三个参数:传递包含所有关注“是否可传输无阻塞数据”的文件描述符的fdset 。

第三个参数:传递包含所有关注“是否可发生异常”的文件描述符的fdset 。

第四个参数:为防止陷入无限阻塞的状态, 传递超时信息。 

3. 调用select函数

  调用函数返回结果:

    • -1:发送错误时返回-1。

    • 0:超时返回0。

    • >0:返回发生时间的文件描述符。

4. 调用select查看结果

  select函数返回值如果是大于0的整数,表示相应数量的文件描述符发生了变化。如下图示例,

  调用函数select函数时,向其传递的fd_set变量将发送变化,所有的1都被为0,但发生变化变化的文件描述符对应的位除外,如图,调用select函数结束后,可知传入的fd_set中只有fd1和fd3是1,即它们对应的文件描述符发生了变化。

进一步理解select函数

  理解select模型的关键在于理解fd_set(这里声明的变量名为reads)这个数据结构,现在假设fd_set的大小是1字节,即8个bit(位)。执行过程可以这样表示:

  • 第一步:FD_ZERO(&reads); 将read指向的fd_set初始化为00000000。

  • 将对应位设置处于监听状态,如fd=6,FD_SET(fd, &reads);此时fd_set变为00100000。

  • 如果还有需要监听的文件描述符,fd1=1,fd2=2,通过FD_SET后结果变为00100011。

  • 调用select(7,reads,0,0,timeval)阻塞等待。默认是从位置0开始,所以要将最大的fd_max+1。

  • 如果此时fd=2对应的文件描述服发生了可读事件,select调用结束,此时fd_set对应的值是00000010,没有事件发生所对应的位将被清空。

示例程序

  利用select监听键盘输入操作:

#include <iostream>
#include <list>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/select.h>
using namespace std;
#define BUF_SIZE 30
​
int main(){
    fd_set reads, temps;
    int result, str_len;
    char buf[BUF_SIZE];
    struct timeval timeout;
​
    FD_ZERO(&reads);
    FD_SET(0, &reads);  //0-该位置是控制台的标准输入
​
    while (1) {
        temps = reads;
        timeout.tv_sec = 5; //秒
        timeout.tv_usec = 0; //微秒
        result = select(1, &temps, 0, 0, &timeout);
        if(result == -1){
            puts("select() error...");
            break;
        }
        else if(result == 0){
            puts("Time wait...");
        }else{
            if(FD_ISSET(0, &temps)){    //fd_set指向的变量中包含文件描述符的信息,返回真
                str_len = read(0, buf, BUF_SIZE);
                buf[str_len] = 0;
                printf("message from console: %s.", buf);
            }
        }
​
    }
​
    return 0;
}
​

select模型的特点

  • 可监控的文件描述符的数量与机器对应的fd_set大小有关,即sizeof(fd_set);

  • 将fd_set传入到select函数调用前,还需要一个fd_set结构存储源数据,用于和调用select函数后的fd_set进行逐位对比,如果有事件发生,则通过FD_ISSET返回;如果原来标记为1,处于监听的文件描述符但没有事件发生,此时会将其置为0;。如上面示例程序的reads和temps。

  • select上接收到普通数据或者带外数据会使select返回不同的就绪状态,普通数据触发可读状态,带外数据触发异常状态。

如下,是一个I/O复用的服务端的案例用来解决多客户端请求的问题:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include<sys/socket.h>
​
#define BUF_SIZE 100
void error_handling(char* message);
​
int main(int argc, char *argv[]){
    int serv_sock, clnt_sock;
    struct sockaddr_in serv_adr, clnt_adr;
    struct timeval timeout;
    fd_set reads, cpy_reads;
​
    socklen_t adr_sz;
    int fd_max, str_len, fd_num, i;
    char buf[BUF_SIZE];
    if(argc != 2){
        printf("Usage : %s <port>\n", argv[0]);
        exit(1);
    }
​
    serv_sock = socket(PF_INET, SOCK_STREAM, 0);
    memset(&serv_adr, 0, sizeof(serv_adr));
    serv_adr.sin_family = AF_INET;
    serv_adr.sin_port = htons(atoi(argv[1]));
    serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
​
    if(bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1){
        error_handling("bind() error.........");
    }
​
    if(listen(serv_sock, 5) == -1){
        error_handling("listen error.........");
    }
​
    FD_ZERO(&reads);
    FD_SET(serv_sock, &reads);
    fd_max = serv_sock;
​
    while(1){   //无限循环中调用select
        cpy_reads = reads;
        timeout.tv_sec = 5;
        timeout.tv_usec = 500;
​
        if((fd_num = select(fd_max+1, &cpy_reads, 0, 0, &timeout)) == -1)
            break;
​
        if(fd_num == 0)
            continue;
​
        for(i = 0; i < fd_max+1; i++){  //遍历观察那些文件描述符发生了变化
            if(FD_ISSET(i, &cpy_reads))  //观察fd_set中位发生变化
            {
                if(i == serv_sock){     //如果是连接请求
                    adr_sz = sizeof(clnt_adr);
                    clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
                    FD_SET(clnt_sock, &reads);
                    if(fd_max < clnt_sock)  //如果clnt_sock对应fd_set中位置大于原先设定的需要监听的范围,则修改监听范围。
                        fd_max  = clnt_sock;
                    printf("Connected client : %d \n", clnt_sock);
                }
                else{   //某些套接字文件描述符指向的信息发生了改变,即收到通知,该文件描述符现在可读
                    str_len = read(i, buf, BUF_SIZE);
                    if(str_len == 0){
                        FD_CLR(i,&reads);
                        close(i);
                        printf("close client:%d \n", i);
                    }else{
                        write(i, buf, str_len);
                    }
                }
            }
        }
    }
    close(serv_sock);
    return 0;
}
​
void error_handling(char* message){
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

2.epoll

  epoll是在2.5.44版内核中提出的(在使用前,应该验证一下内核版本,现在大部分内核版本都在2.6以上,可以通过cat /proc/sys/kernel/osrelease查看),而且epoll方式只在linux下体统支持。关于epoll实现的三个函数:

#include <sys/epoll.h>
int epoll_create(int size); 
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

epoll_create

int epoll_create(int size);

通过调用该函数执行成功后创建的文件描述符保存空间被称为“epoll例程”,参数size只是为操作系统提供一个参考需要为epoll例程多大的空间,即size大小并不等于最终的epoll例程大小。

epoll_ctl

  生成epoll例程后,在其内部注册监视对象文件描述符时需要用到epoll_ctl

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

  参数说明:

  • epfd 用于注册监视对象的epoll实例(文件描述符)

  • op 指定监视对象的添加,删除或修改等操作

  • fd 需要监视对象的文件描述符

  • event 监视对象的事件类型

op可以有3个值,分别为:

  • EPOLL_CTL_ADD : 添加监听的事件

  • EPOLL_CTL_DEL : 删除监听的事件

  • EPOLL_CTL_MOD : 修改监听的事件

epoll_ctl(A, EPOLL_CTL_ADD, B, C)表示在epoll例程A中注册文件描述符B用于监视参数C中的事件。

  epoll_event的结构体如下:

struct epoll_event {
    __uint32_t   events; /* Epoll events */
    epoll_data_t data;   /* User data variable 根据用户需求定制 */
};
typedef union epoll_data {
    void        *ptr;
    int          fd;
    __uint32_t   u32;
    __uint64_t   u64;
} epoll_data_t;

epoll_event的event中保存的常量及其对应的具体时间类型

  • EPOLLERR : 文件上发上了一个错误。这个事件是一直监控的,即使没有明确指定

  • EPOLLHUP : 文件被挂断。这个事件是一直监控的,即使没有明确指定

  • EPOLLRDHUP : 对端关闭连接或者shutdown写入半连接

  • EPOLLET : 开启边缘触发,默认的是水平触发,所以我们并未看到EPOLLLT

  • EPOLLONESHOT : 一个事件发生并读取后,文件自动不再监控

  • EPOLLIN : 文件可读

  • EPOLLPRI : 文件有紧急数据可读

  • EPOLLOUT : 文件可写

  • EPOLLWAKEUP : 如果EPOLLONESHOT和EPOLLET清除了,并且进程拥有CAP_BLOCK_SUSPEND权限,那么这个标志能够保证事件在挂起或者处理的时候,系统不会挂起或休眠

如下示例代码展示其过程

struct epoll_event event;
event.events = EPOLLIN; //发生需要读取数据的情况时
event.data.fd = sockfd;
epoll_ctl(epfd,EPOLL_CTL_ADD,sockfd,event);

如果epoll_ctl方法返回-1,则标志出现了问题,我们可以读取errno来定位错误,有如下errno会被设置:

  • EBADF : epfd或者fd不是一个有效的文件描述符

  • EEXIST : op为EPOLL_CTL_ADD,但fd已经被监控

  • EINVAL : epfd是无效的epoll文件描述符

  • ENOENT : op为EPOLL_CTL_MOD或者EPOLL_CTL_DEL,并且fd未被监控

  • ENOMEM : 没有足够的内存完成当前操作

  • ENOSPC : epoll实例超过了/proc/sys/fs/epoll/max_user_watches中限制的监听数量

epoll_wait

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

参数说明:

  • epfd 用于注册监视对象的epoll实例(文件描述符)

  • events 保存发生事件的文件描述符集合的结构体地址值(数组首地址)

  • maxevents 保存的最大事件数量

  • timeout 等待时间(毫秒),-1表示一直等待事件的发生

该函数的作用类似与select函数。该函数被调用后,返回发生事件的文件描述符数,同时,第二个参数保存发生事件的文件描述符集合。此时,就不需要向像select那样针对所有文件描述符进行循环扫描,确定发生事件的文件描述符。

工作模式

  epoll对文件描述符的操作有两种模式:LT(level trigger 条件触发)和ET(edge trigger 边缘触发)。LT模式是默认模式,LT模式与ET模式的区别可以通过TCP/IP网络编程书中的案例进行解释:

  1. LT 水平触发

  • 儿子:“妈妈,我收到了5000元压岁钱。”

  • 妈妈:“恩,省着点花!”

  • 儿子:“妈妈,我今天买了个ipad,花了3000元。”

  • 妈妈:“噢,这东西真贵。”

  • 儿子:“妈妈,我今天买好多吃的,还剩1000元。”

  • 妈妈:“用完了这些钱,我可不会再给你了。”

  • 儿子:“妈妈,那1000元我没花,零花钱够用了。”

  • 妈妈:“恩,这才是明智的做法!”

  • 儿子:“妈妈,那1000元我没花,我要攒起来。”

  • 妈妈:“恩,加油!”

  只要儿子手中还有钱,他就会一直汇报,这就是LT模式。有钱就是1,没钱就是0,那么只要儿子还有钱,这种事件就是1->1类型事件,自然是LT。将案例中儿子钱包换成输入缓冲区,压岁钱换成输入数据,在条件触发中,只要输入缓冲区有数据,将将会以事件的方式再次注册。

  1. ET 边缘触发

  • 儿子:“妈妈,我收到了5000元压岁钱。”

  • 妈妈:“恩,省着点花!”

  • 儿子:“……”

  • 妈妈:“你倒是说话啊?压岁钱呢?!”

  儿子从没钱到有钱,是一个0->1的过程,因此为ET。儿子和妈妈说过自己拿到了压岁钱就完事了,至于怎么花钱,还剩多少钱,一概不说。可以看出,边缘触发中输入缓冲区中收到数据时仅注册一次,即使输入缓冲区中还有数据,也不会再次注册。

示例程序

  基于socket的客户端和服务端利用epoll来处理I/O复用问题:

服务端程序:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include<sys/socket.h>
#include<sys/epoll.h>
#define BUF_SIZE 4
#define EPOLL_SIZE 50
void error_handling(char* message);
​
int main(int argc, char *argv[]){
    int serv_sock, clnt_sock;
    struct sockaddr_in serv_adr, clnt_adr;
​
​
    socklen_t adr_sz;
    int  str_len, i;
    char buf[BUF_SIZE];
    if(argc != 2){
        printf("Usage : %s <port>\n", argv[0]);
        exit(1);
    }
​
    struct epoll_event* ep_events;
    struct epoll_event event;
    int epfd, event_cnt;
​
    serv_sock = socket(PF_INET, SOCK_STREAM, 0);
    memset(&serv_adr, 0, sizeof(serv_adr));
    serv_adr.sin_family = AF_INET;
    serv_adr.sin_port = htons(atoi(argv[1]));
    serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
​
    if(bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1){
        error_handling("bind() error.........");
    }
​
    if(listen(serv_sock, 5) == -1){
        error_handling("listen error.........");
    }
​
    epfd = epoll_create(EPOLL_SIZE); //返回创建的epoll文件描述符
    ep_events = (struct epoll_event*)malloc(sizeof(struct epoll_event)*EPOLL_SIZE);
​
    event.events = EPOLLIN; //发生读取事件的时候
    event.data.fd = serv_sock;
    epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);
​
    while(1){
        event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
​
        if(event_cnt == -1){
            error_handling("epoll_wait() failed...");
        }
​
        puts("return epoll_wait");
        for(i = 0; i < event_cnt; i++){
            if(ep_events[i].data.fd == serv_sock){  //处理新进的连接请求
                adr_sz = sizeof(clnt_adr);
                clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
                event.data.fd = clnt_sock;
                event.events= EPOLLIN;
                epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event);
                printf("Connected client: %d \n", clnt_sock);
            }else{
                str_len = read(ep_events[i].data.fd, buf, BUF_SIZE);
                if(str_len == 0){   // 关闭客户端连接
                    epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL); //将读取完的连接取消监听
                    close(ep_events[i].data.fd);    //关闭客户端连接
                    printf("close client: %d \n", ep_events[i].data.fd);
                }else{
                    write(ep_events[i].data.fd, buf, str_len);
                }
            }
        }
    }
    close(serv_sock);
    close(epfd);
    return 0;
}
​
void error_handling(char* message){
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

客户端程序:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
using namespace std;
​
#define BUF_SIZE 1024
void errorhandling(char *message);
​
int main(int argc, char *argv[])
{
    int sock;
    struct sockaddr_in serv_addr;
    char message[BUF_SIZE];
    int str_len = 0 ,idx = 0, read_len = 0;
​
    if(argc != 3){
        printf( "Usage : %d <IP> <port> ", argv[0]);
        exit(0);
    }
​
    sock = socket(PF_INET, SOCK_STREAM, 0);
    if(sock == -1){
        errorhandling("socket() error;");
    }
​
    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
    serv_addr.sin_port = htons(atoi(argv[2]));
​
    if(connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1){
        errorhandling("connect error!");
    }else{
        printf("connected.....\n");
    }
​
    while(1){
        fgets(message, BUF_SIZE, stdin);
        fflush(stdin);
        if(!strcmp(message, "q\n") || !strcmp(message, "Q\n"))
            break;
        write(sock, message, strlen(message));
         memset(message, 0, sizeof(message));
        str_len = read(sock, message, BUF_SIZE - 1);
        message[str_len] = '\0';
        printf("Message from server : %s", message);
    }
​
    close(sock);
    return 0;
}
​
void errorhandling(char *message){
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

参考资料

TCP/IP网络编程([韩] 尹圣雨 )

I/O多路复用技术(multiplexing)是什么?

Linux编程之select

IO多路复用之epoll总结

作者:一盏淡酒、醉了夕阳