【Linux】I/O多路复用-SELECT/POLL/EPOLL

23

I/O多路复用

前言

  • 文本相关参考资料及部分内容来源
    • 《Linux高性能服务器编程》
    • 《TCP/IP网络编程》
    • 《Linux/UNIX系统编程手册》

  • I/O多路复用核心思想为,使用一个线程,来处理多个客户端的请求。
  • 或者说,使用一个特殊的fd,监视多个fd。
  • 使得程序能同时监听多个文件描述符,这对提高程序的性能至关重要。

通常网络程序在下列情况下需要使用I/O多路复用技术

  • 客户端程序需要同时处理多个socket。
  • 客户端程序要同时处理用户输入和网络连接。
  • TCP服务器要同时处理监听socket和连接socket。这是I/O复用使用最多的场合
  • 服务器要同时处理TCP请求和UDP请求。
  • 服务器要同时监听多个端口,或处理多种服务。

select

select-函数

  • 在一段指定时间内,监听用户感兴趣的文件描述符上的可读、可写和异常等事件。

  • select

  • 函数原型:

int select(
           int nfds, 
           fd_set *readfds, 
           fd_set *writefds, 
           fd_set *exceptfds, 
           struct timeval *timeout
);
  • 参数:
    • nfds: 被监听的文件描述符的总数。
    • readfds: 将所有关注"是否存在待读取数据"的文件描述符注册到fd_set型变量中(存入fd_set数组),并传递其地址值。-fd_set文件描述符集合,注意传递记得备份,因为调用select后会将其重置
    • writefds: 将所有关注"是否存在无阻塞数据(可写入)"的文件描述符注册到fd_set型变量中(存入fd_set数组),并传递其地址值。
    • exceptfds: 将所有关注"是否发生异常"的文件描述符注册到fd_set型变量中,并传递其地址值。
    • timeout: 用来设定select的阻塞时间上限。
    • 指定为NULL将会一直阻塞,直到某个文件描述符就绪。
    • 指定为一个timeval结构体,详见timeval结构体
  • 返回值:
    • -1: 表示发生错误。
    • 0: 表示超时。
    • >0: 表示有一个或多个文件描述符已达到就绪态,返回值表示处于就绪态的文件描述符个数。[三个集合中就绪的fd数量总和,也就是说,如果一个fd在三个fd_set数组中,三种事件都就绪了,会存在重复累计(对于同一个fd来说)。]

timeval-结构体

  • 结构体定义
struct timeval{
    long tv_sec; // 秒
    long tv_usec; // 微秒
};
  • 如果结构体 timeval 的两个域都为 0 的话,此时 select()不会阻塞,它只是简单地轮询指定的文件描述符集合,看看其中是否有就绪的文件描述符并立刻返回。
  • 否则,timeout 将为 select指定一个等待时间的上限值。

fd_set-文件描述符集合

  • 在fd_set变量中各注册或更改值的操作都由以下四个宏完成。

  • 将fdset所指向的文件描述符集合初始化为空。

void FD_ZERO(fd_set *fdset); 
  • 将文件描述符fd,从fdset所指向的文件描述集合中移除。
void FD_CLR(int fd, fd_set *fdset);
  • 将文件描述符fd,添加到fdset所指向的文件描述集合中。
void FD_SET(int fd, fd_set *fd_set); 
  • 检查指定的文件描述符fd,是否在fdset所指向的文件描述集合中。
    • 存在返回非0,反之返回0。
int FD_ISSET(int fd, fd_set *fdset);

文件描述符就绪条件

  • 以下情况socket可读:
    • socket内核接收缓存区中的字节数大于或等于其低水位标记SO_RCVLOWAT。此时我们可以无阻塞地读取该socket,并且读操作将返回的字节数大于0。
    • socket通信的对方关闭连接。此时对该socket的读操作将返回0。
    • 监听socket上有新的连接请求。
    • socket上有未处理的错误。此时我们可以使用getsockop来读取和清除该错误。
  • 以下情况socket可写:
    • socket内核发送缓冲区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT。此时我们可以无阻塞地写该socket,并且写操作返回的字节数大于0。
    • socket的写操作被关闭。对写操作被关闭的socket执行写操作将触发一个SIGPIPE信号。
    • socket使用非阻塞connect连接成功或失败(超时)之后。
    • socket上有未处理的错误,此时我们可以使用getsockopt来读取和清除该错误。
  • 异常情况:
    • 网络程序中,select能处理的异常情况只有一种: socket上接收到带外数据。

示例

  • server.c
#include <sys/types.h> 
#include <sys/socket.h> 
#include <stdio.h> 
#include <netinet/in.h> 
#include <sys/time.h> 
#include <sys/ioctl.h> 
#include <unistd.h> 
#include <stdlib.h>
#include <errno.h>

int errno;
int main(void){
    int server_sockfd,client_sockfd;
    int server_len,client_len;
    struct sockaddr_in server_address;
    struct sockaddr_in client_address;
    int result;
    //两个文件描述符集合
    fd_set readfds,testfds;//readfds用于检测输出是否就绪的文件描述符集合

    server_sockfd = socket(AF_INET,SOCK_STREAM,0); // 建立服务端socket
    server_address.sin_family = AF_INET;
    server_address.sin_addr.s_addr = htonl(INADDR_ANY);
    server_address.sin_port = htons(9000);
    server_len = sizeof(server_address);
    bind(server_sockfd,(struct sockaddr*)&server_address,server_len);
    listen(server_sockfd,5);// 监听队列最多容纳5个

    FD_ZERO(&readfds);// 清空置0
    FD_SET(server_sockfd,&readfds);// 将服务端socket加入到集合中

    /*
    int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
    */ 

    while(1){
        char ch;
        int fd;
        int nread;
        testfds = readfds;//相当于备份一份,因为调用select后,传进去的文件描述符集合会被修改。
        struct timeval my_time;
        my_time.tv_sec = 2;
        my_time.tv_usec = 0;
        printf("server waiting\n");
        // 监视server_sockfd与client_sockfd
        //result = select(FD_SETSIZE,&testfds,(fd_set*)0,(fd_set*)0,(struct timeval* )0); //无限期阻塞,并测试文件描述符变动
        result = select(FD_SETSIZE,&testfds,(fd_set*)0,(fd_set*)0,&my_time); //根据my_time中设置的时间进行等待,超过继续往下执行。
        if(result < 0){//有错误发生
            perror("select errno"); 
            exit(1);
        }else if(result == 0){//超过等待时间,未响应    
            FD_ZERO(&readfds);// 清空置0
            FD_SET(server_sockfd,&readfds);// 将服务端socket重新加入到集合中
            printf("no connect request \n");
            continue;//没有响应的就别下去遍历了
        }
        //扫描所有的文件描述符(遍历所有的文件句柄),是件很耗时的事情,严重拉低效率。
        for(fd = 0;fd<FD_SETSIZE;fd++){
            //找到相关文件描述符,判断是否在testfds这个文件描述符集合中。 
            if(FD_ISSET(fd,&testfds)){
                //判断是否为服务器套接字,是则表示为客户端请求连接
                if(fd == server_sockfd){
                    client_len = sizeof(client_address);
                    client_sockfd = accept(server_sockfd,(struct sockaddr*)&client_address,&client_len);
                    FD_SET(client_sockfd,&readfds);//将客户端socket加入到集合中,用来监听是否有数据来。
                    printf("adding client on fd %d\n",client_sockfd);;
                }else{// 客户端来消息了
                    //获取接收缓存区中的字节数
                    ioctl(fd,FIONREAD,&nread);//即获取fd来了多少数据

                    //客户端数据请求完毕,关闭套接字,并从集合中清除相应的套接字描述符
                    if(nread ==0){
                        close(fd);
                        FD_CLR(fd,&readfds);//去掉关闭的fd
                        printf("removing client on fd %d\n", fd);
                    }else{//处理客户数请求
                        read(fd,&ch,1);
                        sleep(5);
                        printf("serving client on fd %d\n", fd);
                        ch++;
                        write(fd, &ch, 1);
                    }
                }
            }
        }
    }
    return 0;
}
  • client.c
#include <sys/types.h> 
#include <sys/socket.h> 
#include <stdio.h> 
#include <netinet/in.h> 
#include <arpa/inet.h> 
#include <unistd.h> 
#include <stdlib.h>
#include <sys/time.h>

int main(){

    int client_sockfd;
    int len;
    struct sockaddr_in address;//服务器端网络地址结构体 
    int result;
    char ch = 'A';
    client_sockfd = socket(AF_INET, SOCK_STREAM, 0);//建立客户端socket 
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = inet_addr("127.0.0.1");
    address.sin_port = htons(9000);
    len = sizeof(address);
    result = connect(client_sockfd, (struct sockaddr*)&address, len);
    if (result == -1){
        perror("oops: client2");
        exit(1);
    }
    //第一次读写
    write(client_sockfd, &ch, 1);
    read(client_sockfd, &ch, 1);
    printf("the first time: char from server = %c\n", ch);
    sleep(5);

    //第二次读写
    write(client_sockfd, &ch, 1);
    read(client_sockfd, &ch, 1);
    printf("the second time: char from server = %c\n", ch);

    close(client_sockfd);

    return 0;
}


poll

poll函数

  • 与select类似,也是在指定事件内轮询一定的数量的文件描述符,看其中是否有就绪的。

  • poll

  • 函数原型:

int poll(
         struct pollfd *fds, 
         nfds_t nfds, 
         int timeout
);
  • 参数:
    • fds: pollfd类型的数组,它存储所有我们该兴趣的文件描述符上发生的可读、可写和异常事件。结构体定义详见pollfd结构体
    • nfds: 数组fds中的元素个数,类型为nfds_t无符号整型。
    • timeout: 超时等待时间。
    • -1: 一直阻塞,直到某个事件发生。
    • 0: 调用后不等待立即返回。
  • 返回值:
    • -1: 表示发生错误。
    • 0: 表示超时。
    • >0: 表示fds中有这么多个文件描述符处于就绪态了。即fds中拥有非零revents字段的pollfd结构体数量。

pollfd-结构体

  • 结构体定义:
struct pollfd {
    int fd;          
    short events;     
    short revents;   
};
  • 参数:
    • fd: 文件描述符
    • events: 注册的事件。如下图所示。
    • revents: 实际发生的事件,由内核填充


示例

  • server.c
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <stdlib.h>
#include <poll.h>

#define MAX_FD  8192 //最大文件标识符
struct pollfd  fds[MAX_FD];
int cur_max_fd = 0;     //当前要监听的最大文件描述符+1,减少要遍历的数量。

int main(void){

    int server_sockfd,client_sockfd;
    int server_len,client_len;
    struct sockaddr_in server_address,client_address;
    int result;
    server_sockfd = socket(AF_INET,SOCK_STREAM,0);//服务端socket
    server_address.sin_family = AF_INET;
    server_address.sin_addr.s_addr = htonl(INADDR_ANY);
    server_address.sin_port = htons(9000);
    server_len = sizeof(server_address);
    bind(server_sockfd,(struct sockaddr*)&server_address,server_len);
    listen(server_sockfd,5);

    //添加待监测文件描述符到fds数组中
    fds[server_sockfd].fd = server_sockfd;
    fds[server_sockfd].events = POLLIN;
    fds[server_sockfd].revents = 0;

    if(cur_max_fd <= server_sockfd){
        cur_max_fd = server_sockfd+1;
    }

    while(1){
        char ch;
        int i,fd;
        int nread;
        printf("server waiting\n");

        result = poll(fds,cur_max_fd,1000);
        if(result <0){
            perror("server5");
            exit(1);
        }else if(result == 0){
            printf("no connect,end waiting\n");
        }else{//大于0,返回的是fds中处于就绪态的文件描述符个数。

        }
        //扫描文件描述符
        for(i = 0; i < cur_max_fd;i++){
            if(fds[i].revents){//有没有结果,没有结果说明该文件描述符上还未发生事件。
                fd= fds[i].fd;
                //判断是否为服务器套接字,是则表示为客户端请求连接
                if(fd == server_sockfd){
                    client_len = sizeof(client_address);
                    client_sockfd = accept(server_sockfd,(struct sockaddr*)&client_address,&client_len);
                    fds[client_sockfd].fd = client_sockfd;
                    fds[client_sockfd].events = POLLIN;
                    fds[client_sockfd].revents = 0;
                    if(cur_max_fd <= client_sockfd){
                        cur_max_fd = client_sockfd + 1;
                    }
                    printf("adding client on fd %d\n",client_sockfd);
                }else{//客户端socket中有数据请求
                    if(fds[i].revents & POLLIN){//读
                        nread = read(fd,&ch,1);
                        if(nread == 0){
                            close(fd);
                            memset(&fds[i],0,sizeof(struct pollfd));
                            printf("removing client on fd %d\n",fd);
                        }else{//写
                            sleep(5);
                            printf("serving client on fd %d,receive: %c\n",fd,ch);
                            ch++;
                            fds[i].events = POLLOUT;//添加一个写事件监听
                        }
                    }else if(fds[i].revents & POLLOUT){//写
                        write(fd,&ch,1);
                        fds[i].events = POLLIN;
                    }

                }
            }
        }

    }
    return 0;
}
  • client.c——同select中的实例。


epoll

  • epoll是Linux特有的I/O复用函数。它在实现和使用上与select和poll有很大的差异。
  • epoll使用一组函数来完成任务,而不是单个函数
  • epoll把用户关心的文件描述符上的事件放在内核的一个事件表中,不需要像select与poll那样每次都要重复传入文件描述符集合或是事件集。epoll需要使用一个额外的文件描述符,在内核中唯一标识这个事件表

epoll_create-创建epoll

  • epoll_create
  • 功能: 创建一个epoll实例。
  • 函数原型:
int epoll_create(int size); 
  • 参数:
    • size: 现已被抛弃,只是给内核一个提示,告诉它事件表需要多大。
  • 返回值: 返回创建的epoll实例文件描述符,在其它epoll相关函数中指定要访问的内核事件表

epoll_ctl-操作对应内核事件表

  • epoll_ctl
  • 功能: 操作对应epoll的内核事件表,进行添加/删除/修改指定fd的事件。
  • 函数原型:
int epoll_ctl(
  int epfd, 
  int op, 
  int fd, 
  struct epoll_event *event
); 
  • 参数:
    • epfd: epoll实例,用来指定要访问的内核事件表。
    • op: 用来指定需要执行的操作。
    • EPOLL_CTR_ADD: 往事件表上注册fd上的事件。
    • EPOLL_CTR_MOD: 修改fd上的注册事件。
    • EPOLL_CTR_DEL: 删除fd上的注册事件。
    • fd: 要进行op操作的文件描述符。
    • event: 为一个指向epoll_event结构体的指针。结构体定义如下event_event-结构体所示:
  • 返回值:
    • 成功: 返回0。
    • 失败: 返回-1并设置errno。

epoll_event-结构体

  • 结构体定义:
struct epoll_event{
    uint32_t events;
    epoll_data_t data;
};

image-20220913201232442


epoll_data_t-结构体

  • 结构体定义:
typedef union epoll_data{
    void *ptr;    // 用户自定义使用
    int fd;      // 指定事件文件描述符
    uint32_t u32;
    uint64_t u64;
}epoll_data_t;
  • 参数:
    • ptr: 可用来指定与fd相关的用户数据。
    • fd: 指定事件所从属的目标文件描述符。
  • 注意:
    • epoll_data_t是一个联合体,所以我们只能使用fd或ptr其中一个成员。
    • 如果要将文件描述符和用户数据关联起来,以实现快速的数据访问,可以放弃使用epoll_data_t中的fd成员,而是在ptr所指向的自定义用户数据中包含fd。
  • 补充:
    • 当我们调用epoll_wait后,evlist数组中的epoll_event每个data参数为我们在一开始(即调用epoll_ctl)所指定的内容,比如像上面所说的我们指定了自定义数据ptr,最终某一fd产生了我们监视的事件,我们可以在其对应的epoll_event的data中取到。例如下方epoll-简易web服务器中的_ConnectStat结构体。

epoll_wait-事件等待

  • epoll_wait
  • 功能: 在一段超时时间内等待一组文件描述符上的事件。
  • 函数原型:
int epoll_wait(int epfd, 
               struct epoll_event * evlist, 
               int maxevents, 
               int timeout
); 
  • 参数:
    • epfd: epoll文件描述符,指定内核事件表。
    • evlist: 分配好的epoll_event结构体数组,epoll将会把发生的事件复制到evlist数组中。
    • maxevents: 最多监听多少个时间,必须大于0。
    • timeout: 表示在没有检测到事件发生时最多等待的时间(ms)。
    • 0: 将会立即返回,不会等待。
    • -1: 表示无限期阻塞,直到有事件发生。
    • >0: 阻塞(等待)时间。
  • 返回值:
    • 成功: 返回就绪的文件描述符个数。
    • 失败: 返回-1并设置errno。

epoll-简易web服务器

  • 使用epoll的一个简易web服务器

头文件

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/types.h>
#include<sys/epoll.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<assert.h>
#include<fcntl.h>
#include<unistd.h>

自定义保存数据的结构体

//因为下面的函数指针所以单独拿出来声明typedef
typedef struct _ConnectStat  ConnectStat;

typedef void(*response_handler) (ConnectStat * stat);

// 保存自定义数据的结构体,调用epoll时用epoll_data_t中的ptr存储
struct _ConnectStat {
    int fd;                     //文件描述符
    char name[64];              //姓名
    char  age[64];              //年龄
    struct epoll_event _ev;     //当前文件句柄对应epoll事件
    int  status;                //0-未登录,1-已登录
    response_handler handler;   //不同页面的处理函数
};

相关函数声明与全局变量

// 初始化一个自定义数据存储结构体
ConnectStat * stat_init(int fd);

// 将新链接进来的客户端fd放入当前epoll所对应的内核事件表中
void connect_handle(int new_fd);

// 请求响应-指定对应的处理函数
void do_http_respone(ConnectStat * stat);

// 处理http请求
void do_http_request(ConnectStat * stat);

// 响应处理函数——请求链接返回的内容
void welcome_response_handler(ConnectStat * stat);

// 响应处理函数——commit后返回的内容
void commit_respone_handler(ConnectStat * stat);

// 将新链接进来的客户端fd放入当前epoll所对应的内核事件表中
void connect_handle(int new_fd);

// 创建一个监听套接字 - 略
int startup(char* _ip, int _port);

// 将fd-设置为非阻塞状态,即给指定fd添加状态
void set_nonblock(int fd);

// 打印信息提示ip:port
void usage(const char* argv);

// 响应头
const char *main_header = "HTTP/1.0 200 OK\r\nServer: Xuanxuan Server\r\nContent-Type: text/html\r\nConnection: Close\r\n";

static int epfd = 0;// epoll文件描述符,对应一张内核事件表

初始化一个自定义数据存储结构体

// 初始化自定义数据存储结构体
ConnectStat * stat_init(int fd) {
    ConnectStat * temp = NULL;
    temp = (ConnectStat *)malloc(sizeof(ConnectStat));

    if (!temp) {
        fprintf(stderr, "malloc failed. reason: %m\n");
        return NULL;
    }

    memset(temp, '\0', sizeof(ConnectStat));
    temp->fd = fd; 
    temp->status = 0; 
}

处理http请求

// 解析http请求
void do_http_request(ConnectStat * stat) {

    //读取和解析http 请求
    char buf[4096];
    char * pos = NULL;

    ssize_t _s = read(stat->fd, buf, sizeof(buf) - 1);
    if (_s > 0){// 读取到数据
        buf[_s] = '\0';
        // printf("receive from client:%s\n", buf);//GET / HTTP/1.1
        pos = buf;

        //Demo 仅仅演示效果,不做详细的协议解析
        if (!strncasecmp(pos, "GET", 3)) {// 是否为Get请求
            stat->handler = welcome_response_handler;// 设置执行函数
        }else if (!strncasecmp(pos, "Post", 4)) {// 是否为POST请求
            //获取 uri
            //printf("---Post----\n");
            pos += strlen("Post");
            while (*pos == ' ' || *pos == '/') ++pos;

            // POST /commit HTTP/1.1
            if (!strncasecmp(pos, "commit", 6)) {//提交
                int len = 0;

                //printf("post commit --------\n");
                pos = strstr(buf, "\r\n\r\n");//返回第一次出现\r\n\r\n的位置
                char *end = NULL;
                // 拿到姓名与年龄
                if (end = strstr(pos, "name=")) {
                    pos = end + strlen("name=");
                    end = pos;
                    while (('a' <= *end && *end <= 'z') || ('A' <= *end && *end <= 'Z') || ('0' <= *end && *end <= '9'))  end++;
                    len = end - pos;
                    if (len > 0) {// 将姓名存入自定义结构体中
                        memcpy(stat->name, pos, end - pos);
                        stat->name[len] = '\0';
                    }
                }
                if (end = strstr(pos, "age=")) {
                    pos = end + strlen("age=");
                    end = pos;
                    while ('0' <= *end && *end <= '9')    end++;
                    len = end - pos;
                    if (len > 0) {// 将年龄存入自定义结构体中
                        memcpy(stat->age, pos, end - pos);
                        stat->age[len] = '\0';
                    }
                }
                stat->handler = commit_respone_handler;// 设置响应函数
            }
            else {
                stat->handler = welcome_response_handler;// 设置响应函数
            }
        }
        else {
            stat->handler = welcome_response_handler;// 设置响应函数
        }

        stat->_ev.events = EPOLLOUT; // 修改事件类型
        epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);   //修改,交给eoill监视。
    }else if (_s == 0){// 没有读取到数据,客户端关闭。
        printf("client: %d close\n", stat->fd);
        epoll_ctl(epfd, EPOLL_CTL_DEL, stat->fd, NULL);// 将对应fd从对应epoll的内核事件表中删除
        close(stat->fd);// 关闭套接字
        free(stat); // 释放内存
    }else{// read发生错误
        perror("read");
    }
}

请求响应-根据指定的处理函数

void do_http_respone(ConnectStat * stat) {
    stat->handler(stat);// 调用对应设置的函数
}

响应处理函数——请求链接返回的内容

void welcome_response_handler(ConnectStat * stat) {
    const char * welcome_content = "\
            <html lang=\"zh-CN\">\n\
            <head>\n\
            <meta content=\"text/html; charset=utf-8\" http-equiv=\"Content-Type\">\n\
            <title>This is a test</title>\n\
            </head>\n\
            <body>\n\
            <div align=center height=\"500px\" >\n\
            <br/><br/><br/>\n\
            <h2>Hello World</h2><br/><br/>\n\
            <form action=\"commit\" method=\"post\">\n\
            姓名: <input type=\"text\" name=\"name\" />\n\
            <br/>年龄: <input type=\"password\" name=\"age\" />\n\
            <br/><br/><br/><input type=\"submit\" value=\"提交\" />\n\
            <input type=\"reset\" value=\"重置\" />\n\
            </form>\n\
            </div>\n\
            </body>\n\
            </html>";

    char sendbuffer[4096];
    char content_len[64];

    strcpy(sendbuffer, main_header);// 拷贝响应头
    snprintf(content_len, 64, "Content-Length: %d\r\n\r\n", (int)strlen(welcome_content));
    strcat(sendbuffer, content_len);
    strcat(sendbuffer, welcome_content);
    //printf("send reply to client \n%s", sendbuffer);

    // 写给客户端-即发起请求的浏览器
    write(stat->fd, sendbuffer, strlen(sendbuffer));

    stat->_ev.events = EPOLLIN;      // 修改关心的事件
    //stat->_ev.data.ptr = stat;
    epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);
}

响应处理函数——commit后返回的内容

void commit_respone_handler(ConnectStat * stat) {
    const char * commit_content = "\
        <html lang=\"zh-CN\">\n\
        <head>\n\
        <meta content=\"text/html; charset=utf-8\" http-equiv=\"Content-Type\">\n\
        <title>This is a test</title>\n\
        </head>\n\
        <body>\n\
        <div align=center height=\"500px\" >\n\
        <br/><br/><br/>\n\
        <h2>欢迎 %s  ,年龄 %s!</h2><br/><br/>\n\
        </div>\n\
        </body>\n\
        </html>\n";

    char sendbuffer[4096];
    char content[4096];
    char content_len[64];
    int len = 0;

    len = snprintf(content, 4096, commit_content, stat->name, stat->age);
    strcpy(sendbuffer, main_header); //响应头
    snprintf(content_len, 64, "Content-Length: %d\r\n\r\n", len);
    strcat(sendbuffer, content_len);
    strcat(sendbuffer, content);
    //printf("send reply to client \n%s", sendbuffer);

    write(stat->fd, sendbuffer, strlen(sendbuffer));

    stat->_ev.events = EPOLLIN; // 修改关心的事件

    epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);// 交给epoll来监视
}

打印信息提示ip:port

void usage(const char* argv){
    printf("%s:[ip][port]\n", argv);
}

将fd-设置为非阻塞状态

void set_nonblock(int fd){
    // 这里的文件状态标志flag即open函数的第二个参数
    int fl = fcntl(fd, F_GETFL);// 获取设置的flag
    fcntl(fd, F_SETFL, fl | O_NONBLOCK);// 设置flag
    // fcntl函数              https://blog.csdn.net/zhoulaowu/article/details/14057799
    // O_NONBLOCK https://blog.csdn.net/cjfeii/article/details/115484558
}

创建一个监听套接字

int startup(char* _ip, int _port){
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0){
        perror("sock");
        exit(2);
    }

    int opt = 1;
    setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    struct sockaddr_in local;
    local.sin_port = htons(_port);
    local.sin_family = AF_INET;
    local.sin_addr.s_addr = inet_addr(_ip);

    if (bind(sock, (struct sockaddr*)&local, sizeof(local)) < 0){
        perror("bind");
        exit(3);
    }

    if (listen(sock, 5) < 0){
        perror("listen");
        exit(4);
    }
    return sock;    //返回套接字
}

main

#include "epoll_server.h"

int main(int argc, char *argv[]){

    if (argc != 3){//检查输入的参数个数是否正确
        usage(argv[0]);
        exit(1);
    }

    //创建一个server socket
    int listen_sock = startup(argv[1], atoi(argv[2]));      

    //创建epoll
    epfd = epoll_create(256);
    if (epfd < 0){//创建失败
        perror("epoll_create");
        exit(5);
    }

    ConnectStat * stat = stat_init(listen_sock);// 自定义数据存储

    struct epoll_event _ev;     //epoll事件结构体
    _ev.events = EPOLLIN;       //设置关心事件为读事件     
    _ev.data.ptr = stat;        //接收返回值

    //将listen_sock添加到epfd中,关心读事件,有客户端来请求链接
    epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sock, &_ev);

    struct epoll_event revs[64];//接收返回的产生响应的事件

    int timeout = -1;// -1无限期阻塞
    int num = 0;// 就绪的请求I/O个数

    while (1){
        //检测事件
        switch ((num = epoll_wait(epfd, revs, 64, timeout))){
        case 0:   //监听超时               
            printf("timeout\n");
            break;
        case -1: //出错     
            perror("epoll_wait");
            break;
        default:{   //>0,即返回了需要处理事件的数目
            //拿到对应的文件描述符
            struct sockaddr_in peer;
            socklen_t len = sizeof(peer);

            for (int i = 0; i < num; i++){//
                //拿到该fd相关的链接信息
                ConnectStat * stat = (ConnectStat *)revs[i].data.ptr;

                int rsock = stat->fd;//拿到对应的fd,进行如下的判断
                if (rsock == listen_sock && (revs[i].events) && EPOLLIN) {// 有客户端链接
                    int new_fd = accept(listen_sock, (struct sockaddr*)&peer, &len);

                    if (new_fd > 0){//accept成功
                        printf("get a new client:%s:%d\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port));   
                        connect_handle(new_fd);// 监听新进来的客户端fd
                    }
                }else {//除server socket 之外的其他fd就绪
                    if (revs[i].events & EPOLLIN){//有数据可读
                        do_http_request((ConnectStat *)revs[i].data.ptr);
                    }else if (revs[i].events & EPOLLOUT){//写
                        do_http_respone((ConnectStat *)revs[i].data.ptr);// 完成响应后会再次关心EPOLLIN事件,等待下一次请求。                    
                    }else{
                    }
                }
            }
        }
        break;
        }
    }
    return 0;
}
  • 运行-访问-示例: ./main 192.168.0.70 8080

image-20220924204711787

image-20220924204731175


LT与ET

  • Level Trigger——水平触发:
    • 当被监控的文件描述符上有可读写的事件发生时,epoll_wait会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次有调用epoll_wait时,它还会通知你在上次没读写完的文件描述符上继续读写,如果你一直不去读写它,它就会一直通知你。
    • 如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率。
    • 应用程序可以不立即处理该事件,因为当下一次调用epoll_wait,epoll_wait还会再次向应用程序通告此事件。
    • 设置方式: 默认即水平触发。
  • Edge_triggered(边缘触发):
    • 当被监控的文件描述符上有可读写的事件发生时,epoll_wait会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你
    • 这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符,很大程度上降低了同一个epoll事件被重复触发的次数。
    • 同时,应用程序应立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件(之后的读写事件就会通知,只是这次的不会了)。
  • 设置方式(epoll):
    • 对应文件描述符上要监听的事件设置为,events |= EPOLLET
    • 同时对该文件描述符设置为非阻塞模式。如上epoll-简易web服务器中所示。

EPOLLONESHOT事件

  • 用途: 保证一个socket连接在任一时刻都只被一个线程处理,从而保证连接的完整性,避免了很多可能的竞态条件。
  • 可能产生的情景: 一个线程(或进程)在读取完某个socket上的数据并开始处理时,在处理的过程中该socket上又有新的数据可读(EPOLLIN被再次触发),此时唤醒另一个线程来读取这些新的数据。于是就出现了两个线程操作一个socket的局面。
  • 使用: 使用epoll_ctrl函数在该socket(文件描述符)上注册EPOLLONESHOT事件。
  • 注意:
    • 注册完EPOLLONESHOT事件的socket一旦被某个线程处理完毕,应立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下次可读时,其EPOLLIN事件能被触发,同时也给其它线程处理这个socket的机会。
    • 用于监听链接请求的Server_socket是不能注册EPOLLONESHOT事件的,否则应用程序只能处理一个客户链接,因为后续的客户链接请求将不再触发Server_socket上的EPOLLIN事件。
    • 如果某一线程处理完成该socket上的请求之后,又在该socket上收到了新的客户请求,该线程将继续接触这个socket。

代码示例

主线程中循环监听事件

while( 1 ){
        int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ret < 0 )break;

        for ( int i = 0; i < ret; i++ ){
            int sockfd = events[i].data.fd;
            if ( sockfd == listenfd ){// 有链接请求接入
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                addfd( epollfd, connfd, true );
            }
            else if ( events[i].events & EPOLLIN ){
                pthread_t thread;
                fds fds_for_new_worker;
                fds_for_new_worker.epollfd = epollfd;
                fds_for_new_worker.sockfd = sockfd;
                // 创建一个线程去处理
                pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
            }
            else printf( "something else happened \n" );
        }
    }

将指定fd上的某一事件注册到对应的内核事件表中

void addfd( int epollfd, int fd, bool oneshot ){
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    if( oneshot ){
        event.events |= EPOLLONESHOT;// 注册EPOLLONESHOT事件
    }
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );// 设置为非阻塞fd
}

设置fd为非阻塞

int setnonblocking( int fd ){
    int old_option = fcntl( fd, F_GETFL );// 拿到之前对该fd的设置属性
    int new_option = old_option | O_NONBLOCK;// 追加O_NONBLOCK属性
    fcntl( fd, F_SETFL, new_option );// 设置
    return old_option;// 当前示例Demo返回无意义,未使用。
}

线程工作函数

void* worker( void* arg ){
    int sockfd = ( (fds*)arg )->sockfd;
    int epollfd = ( (fds*)arg )->epollfd;
    printf( "start new thread to receive data on fd: %d\n", sockfd );
    char buf[ BUFFER_SIZE ];
    memset( buf, '\0', BUFFER_SIZE );
    while( 1 ){// 因为是非阻塞的,所以要一次性读光,即要立即处理,因为epoll_wait只会提醒一次。
        int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
        if( ret == 0 ){
            close( sockfd );
            printf( "foreiner closed the connection\n" );
            break;
        }
        else if( ret < 0 ){
            if( errno == EAGAIN ){// 读光啦
                reset_oneshot( epollfd, sockfd );// 重置注册事件
                printf( "read later\n" );
                break;
            }
        }
        else{
            printf( "get content: %s\n", buf );
            // sleep 5秒,模拟数据处理过程
            sleep( 5 );
        }
    }
    printf( "end thread receiving data on fd: %d\n", sockfd );
}

重置fd上注册的事件

void reset_oneshot( int epollfd, int fd ){
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );
}

三个I/O复用函数的对比

  • select:
    • select的参数类型fd_set,仅仅是个文件描述符集合,因此select需要3个这种类型的参数来区分可读、可写及异常事件。
    • 一方面使得select不能处理更多类型的事件,另一方面内核对fd_set在线修改,导致应用程序下次再调用select前不得不重置这三个fd_set集合。同时我们也需要在使用前进行备份
  • poll:
    • poll参数类型pollfd要聪明一些,将文件描述符和事件类型定义在一起,调用后修改的是pollfd结构体中的revents成员,为实际检测到的事件,我们设置的events成员保持不变。再次调用后,revents会被重新置空。
  • select与poll每次调用后,都需要遍历整个用户关心的事件集合,无论其中的事件是否就绪,所以应用程序检索就绪文件描述符的时间复杂度为O(n)。
  • epoll:
    • epoll使用与上面二者完全不同的方式来管理用户注册事件,它在内核中维护一个事件表,并提供独立的系统调用epoll_ctl来往其中进行添加、删除、修改事件,而无须反复地从用户空间读入这些事件。
  • epoll_wait系统调用的events参数负责保存这些就绪的事件,使得应用程序检索就绪文件描述符的时间复杂度达到O(1)。

  • 最大支持文件描述符个数:
    • poll与epoll_wait分别用nfds和maxevents参数来指定最多监听多少个文件描述符和事件,这两个数值都能达到系统允许打开的最大文件描述符个数——65535。而select允许监听的最大文件描述符数量通常有限制。虽然用户可以修改这个限制,但是这可能会导致不可预期的后果。
  • 工作模式:
    • select与poll都只能工作在相对低效的LT模式,而epoll可以工作在高效的ET模式。
  • 内核实现:
    • select与poll采用的是轮询的方式,每次扫描整个注册文件描述符集合,将就绪的文件描述符返回给用户程序。检测就绪事件的时间复杂度为O(n)。
    • epoll_wait采用回调的方式,内核检测到就绪的文件描述符时,将触发回调函数,回调函数将该文件描述符上对应的事件插入内核就绪事件队列。内核最后在适当的时机将该就绪事件队列中的内容拷贝到用户空间。
    • 活动连接比较多的时候,epoll_wait的效率未必比select和poll高,因为此时回调函数被触发得过于频繁。所以epoll_wait适用于连接数量多,但活动连接较少的情况。

image-20220925162946375