19 select感知多个I-O事件
什么是I/O多路复用
在 第11讲 中,我们设计了这样一个应用程序,该程序从标准输入接收数据输入,然后通过套接字发送出去,同时,该程序也通过套接字接收对方发送的数据流。
我们可以使用fgets方法等待标准输入,但是一旦这样做,就没有办法在套接字有数据的时候读出数据;我们也可以使用read方法等待套接字有数据返回,但是这样做,也没有办法在标准输入有数据的情况下,读入数据并发送给对方。
I/O多路复用的设计初衷就是解决这样的场景。我们可以把标准输入、套接字等都看做I/O的一路,多路复用的意思,就是在任何一路I/O有“事件”发生的情况下,通知应用程序去处理相应的I/O事件,这样我们的程序就变成了“多面手”,在同一时刻仿佛可以处理多个I/O事件。
像刚才的例子,使用I/O复用以后,如果标准输入有数据,立即从标准输入读入数据,通过套接字发送出去;如果套接字有数据可以读,立即可以读出数据。
select函数就是这样一种常见的I/O多路复用技术,我们将在后面继续讲解其他的多路复用技术。使用select函数,通知内核挂起进程,当一个或多个I/O事件发生后,控制权返还给应用程序,由应用程序进行I/O事件的处理。
这些I/O事件的类型非常多,比如:
- 标准输入文件描述符准备好可以读。
- 监听套接字准备好,新的连接已经建立成功。
- 已连接套接字准备好可以写。
- 如果一个I/O事件等待超过了10秒,发生了超时事件。
select函数的使用方法
select函数的使用方法有点复杂,我们先看一下它的声明:
int select(int maxfd, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout);
返回:若有就绪描述符则为其数目,若超时则为0,若出错则为-1
在这个函数中,maxfd表示的是待测试的描述符基数,它的值是待测试的最大描述符加1。比如现在的select待测试的描述符集合是{0,1,4},那么maxfd就是5,为啥是5,而不是4呢? 我会在下面进行解释。
紧接着的是三个描述符集合,分别是读描述符集合readset、写描述符集合writeset和异常描述符集合exceptset,这三个分别通知内核,在哪些描述符上检测数据可以读,可以写和有异常发生。
那么如何设置这些描述符集合呢?以下的宏可以帮助到我们。
void FD_ZERO(fd_set *fdset);
void FD_SET(int fd, fd_set *fdset);
void FD_CLR(int fd, fd_set *fdset);
int FD_ISSET(int fd, fd_set *fdset);
如果你刚刚入门,理解这些宏可能有些困难。没有关系,我们可以这样想象,下面一个向量代表了一个描述符集合,其中,这个向量的每个元素都是二进制数中的0或者1。
a[maxfd-1], ..., a[1], a[0]
我们按照这样的思路来理解这些宏:
- FD_ZERO用来将这个向量的所有元素都设置成0;
- FD_SET用来把对应套接字fd的元素,a[fd]设置成1;
- FD_CLR用来把对应套接字fd的元素,a[fd]设置成0;
- FD_ISSET对这个向量进行检测,判断出对应套接字的元素a[fd]是0还是1。
其中0代表不需要处理,1代表需要处理。
怎么样,是不是感觉豁然开朗了?
实际上,很多系统是用一个整型数组来表示一个描述字集合的,一个32位的整型数可以表示32个描述字,例如第一个整型数表示0-31描述字,第二个整型数可以表示32-63描述字,以此类推。
这个时候再来理解为什么描述字集合{0,1,4},对应的maxfd是5,而不是4,就比较方便了。
因为这个向量对应的是下面这样的:
a[4],a[3],a[2],a[1],a[0]
待测试的描述符个数显然是5, 而不是4。
三个描述符集合中的每一个都可以设置成空,这样就表示不需要内核进行相关的检测。
最后一个参数是timeval结构体时间:
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
这个参数设置成不同的值,会有不同的可能:
第一个可能是设置成空(NULL),表示如果没有I/O事件发生,则select一直等待下去。
第二个可能是设置一个非零的值,这个表示等待固定的一段时间后从select阻塞调用中返回,这在 第12讲 超时的例子里曾经使用过。
第三个可能是将tv_sec和tv_usec都设置成0,表示根本不等待,检测完毕立即返回。这种情况使用得比较少。
程序例子
下面是一个具体的程序例子,我们通过这个例子来理解select函数。
int main(int argc, char **argv) {
if (argc != 2) {
error(1, 0, "usage: select01 <IPaddress>");
}
int socket_fd = tcp_client(argv[1], SERV_PORT);
char recv_line[MAXLINE], send_line[MAXLINE];
int n;
fd_set readmask;
fd_set allreads;
FD_ZERO(&allreads);
FD_SET(0, &allreads);
FD_SET(socket_fd, &allreads);
for (;;) {
readmask = allreads;
int rc = select(socket_fd + 1, &readmask, NULL, NULL, NULL);
if (rc <= 0) {
error(1, errno, "select failed");
}
if (FD_ISSET(socket_fd, &readmask)) {
n = read(socket_fd, recv_line, MAXLINE);
if (n < 0) {
error(1, errno, "read error");
} else if (n == 0) {
error(1, 0, "server terminated \n");
}
recv_line[n] = 0;
fputs(recv_line, stdout);
fputs("\n", stdout);
}
if (FD_ISSET(STDIN_FILENO, &readmask)) {
if (fgets(send_line, MAXLINE, stdin) != NULL) {
int i = strlen(send_line);
if (send_line[i - 1] == '\n') {
send_line[i - 1] = 0;
}
printf("now sending %s\n", send_line);
size_t rt = write(socket_fd, send_line, strlen(send_line));
if (rt < 0) {
error(1, errno, "write failed ");
}
printf("send bytes: %zu \n", rt);
}
}
}
}
程序的12行通过FD_ZERO初始化了一个描述符集合,这个描述符读集合是空的:
接下来程序的第13和14行,分别使用FD_SET将描述符0,即标准输入,以及连接套接字描述符3设置为待检测:
接下来的16-51行是循环检测,这里我们没有阻塞在fgets或read调用,而是通过select来检测套接字描述字有数据可读,或者标准输入有数据可读。比如,当用户通过标准输入使得标准输入描述符可读时,返回的readmask的值为:
这个时候select调用返回,可以使用FD_ISSET来判断哪个描述符准备好可读了。如上图所示,这个时候是标准输入可读,37-51行程序读入后发送给对端。
如果是连接描述字准备好可读了,第24行判断为真,使用read将套接字数据读出。
我们需要注意的是,这个程序的17-18行非常重要,初学者很容易在这里掉坑里去。
第17行是每次测试完之后,重新设置待测试的描述符集合。你可以看到上面的例子,在select测试之前的数据是{0,3},select测试之后就变成了{0}。
这是因为select调用每次完成测试之后,内核都会修改描述符集合,通过修改完的描述符集合来和应用程序交互,应用程序使用FD_ISSET来对每个描述符进行判断,从而知道什么样的事件发生。
第18行则是使用socket_fd+1来表示待测试的描述符基数。切记需要+1。
套接字描述符就绪条件
当我们说select测试返回,某个套接字准备好可读,表示什么样的事件发生呢?
第一种情况是套接字接收缓冲区有数据可以读,如果我们使用read函数去执行读操作,肯定不会被阻塞,而是会直接读到这部分数据。
第二种情况是对方发送了FIN,使用read函数执行读操作,不会被阻塞,直接返回0。
第三种情况是针对一个监听套接字而言的,有已经完成的连接建立,此时使用accept函数去执行不会阻塞,直接返回已经完成的连接。
第四种情况是套接字有错误待处理,使用read函数去执行读操作,不阻塞,且返回-1。
总结成一句话就是,内核通知我们套接字有数据可以读了,使用read函数不会阻塞。
不知道你是不是和我一样,刚开始理解某个套接字可写的时候,会有一个错觉,总是从应用程序角度出发去理解套接字可写,我开始是这样想的,当应用程序完成相应的计算,有数据准备发送给对端了,可以往套接字写,对应的就是套接字可写。
其实这个理解是非常不正确的,select检测套接字可写, 完全是基于套接字本身的特性来说 的,具体来说有以下几种情况。
第一种是套接字发送缓冲区足够大,如果我们使用阻塞套接字进行write操作,将不会被阻塞,直接返回。
第二种是连接的写半边已经关闭,如果继续进行写操作将会产生SIGPIPE信号。
第三种是套接字上有错误待处理,使用write函数去执行写操作,不阻塞,且返回-1。
总结成一句话就是,内核通知我们套接字可以往里写了,使用write函数就不会阻塞。
总结
今天我讲了select函数的使用。select函数提供了最基本的I/O多路复用方法,在使用select时,我们需要建立两个重要的认识:
- 描述符基数是当前最大描述符+1;
- 每次select调用完成之后,记得要重置待测试集合。
思考题
select可以对诸如UNIX管道(pipe)这样的描述字进行检测么?如果可以,检测的就绪条件是什么呢?
一个描述符集合哪些描述符被设置为1,需要进行检测是完全可以知道的,你认为select函数里一定需要传入描述字基数这个值么?
nfds 参数非常重要,因为它告诉 select() 函数要关注的最大文件描述符的范围。这是因为 select() 函数内部会创建一个足够大的数组来存储每个文件描述符的状态,而 nfds 决定了数组的大小。
1.内存分配: select() 需要知道需要为文件描述符分配多少内存。
2.性能优化: 通过设置 nfds,内核可以避免为不需要检查的文件描述符分配内存和维护状态。
3.边界: 它提供了一个边界,确保不会访问不存在的文件描述符。
为啥说select函数对fd有1024的限制?
首先,man select,搜索FD_SETSIZE会看到如下的内容
An fd_set is a fixed size buffer. Executing FD_CLR() or FD_SET() with a value of fd that is ne
gative or is equal to or larger than FD_SETSIZE will result in undefined behavior. Moreover, P
OSIX requires fd to be a valid file descriptor.
其中最关键的是FD_SETSIZE,是在bitmap位图运算的时候会受到他的影响
其次,sys/select.h头文件有如下定义:
#define FD_SETSIZE __FD_SETSIZE
typesizes.h头文件有如下定义:
#define __FD_SETSIZE 1024
20 poll
select方法是多个UNIX平台支持的非常常见的I/O多路复用技术,它通过描述符集合来表示检测的I/O对象,通过三个不同的描述符集合来描述I/O事件 :可读、可写和异常。但是select有一个缺点,那就是所支持的文件描述符的个数是有限的。在Linux系统中,select的默认最大值为1024。
那么有没有别的I/O多路复用技术可以突破文件描述符个数限制呢?当然有,这就是poll函数。
poll函数介绍
poll是除了select之外,另一种普遍使用的I/O多路复用技术,和select相比,它和内核交互的数据结构有所变化,另外,也突破了文件描述符的个数限制。
下面是poll函数的原型:
int poll(struct pollfd *fds, unsigned long nfds, int timeout);
返回值:若有就绪描述符则为其数目,若超时则为0,若出错则为-1
这个函数里面输入了三个参数,第一个参数是一个pollfd的数组。其中pollfd的结构如下:
struct pollfd {
int fd; /* file descriptor */
short events; /* events to look for */
short revents; /* events returned */
};
这个结构体由三个部分组成,首先是描述符fd,然后是描述符上待检测的事件类型events,注意这里的events可以表示多个不同的事件,具体的实现可以通过使用二进制掩码位操作来完成,例如,POLLIN和POLLOUT可以表示读和写事件。
#define POLLIN 0x0001 /* any readable data available */
#define POLLPRI 0x0002 /* OOB/Urgent readable data */
#define POLLOUT 0x0004 /* file descriptor is writeable */
和select非常不同的地方在于,poll每次检测之后的结果不会修改原来的传入值,而是将结果保留在revents字段中,这样就不需要每次检测完都得重置待检测的描述字和感兴趣的事件。我们可以把revents理解成“returned events”。
events类型的事件可以分为两大类。
第一类是可读事件,有以下几种:
#define POLLIN 0x0001 /* any readable data available */
#define POLLPRI 0x0002 /* OOB/Urgent readable data */
#define POLLRDNORM 0x0040 /* non-OOB/URG data available */
#define POLLRDBAND 0x0080 /* OOB/Urgent readable data */
一般我们在程序里面有POLLIN即可。套接字可读事件和select的readset基本一致,是系统内核通知应用程序有数据可以读,通过read函数执行操作不会被阻塞。
第二类是可写事件,有以下几种:
#define POLLOUT 0x0004 /* file descriptor is writeable */
#define POLLWRNORM POLLOUT /* no write type differentiation */
#define POLLWRBAND 0x0100 /* OOB/Urgent data can be written */
一般我们在程序里面统一使用POLLOUT。套接字可写事件和select的writeset基本一致,是系统内核通知套接字缓冲区已准备好,通过write函数执行写操作不会被阻塞。
以上两大类的事件都可以在“returned events”得到复用。还有另一大类事件,没有办法通过poll向系统内核递交检测请求,只能通过“returned events”来加以检测,这类事件是各种错误事件。
#define POLLERR 0x0008 /* 一些错误发送 */
#define POLLHUP 0x0010 /* 描述符挂起*/
#define POLLNVAL 0x0020 /* 请求的事件无效*/
看一下poll函数的原型。参数nfds描述的是数组fds的大小,简单说,就是向poll申请的事件检测的个数。
最后一个参数timeout,描述了poll的行为。
如果是一个<0的数,表示在有事件发生之前永远等待;如果是0,表示不阻塞进程,立即返回;如果是一个>0的数,表示poll调用方等待指定的毫秒数后返回。
关于返回值,当有错误发生时,poll函数的返回值为-1;如果在指定的时间到达之前没有任何事件发生,则返回0,否则就返回检测到的事件个数,也就是“returned events”中非0的描述符个数。
poll函数有一点非常好,如果我们 不想对某个pollfd结构进行事件检测, 可以把它对应的pollfd结构的fd成员设置成一个负值。这样,poll函数将忽略这样的events事件,检测完成以后,所对应的“returned events”的成员值也将设置为0。
和select函数对比一下,我们发现poll函数和select不一样的地方就是,在select里面,文件描述符的个数已经随着fd_set的实现而固定,没有办法对此进行配置;而在poll函数里,我们可以控制pollfd结构的数组大小,这意味着我们可以突破原来select函数最大描述符的限制,在这种情况下,应用程序调用者需要分配pollfd数组并通知poll函数该数组的大小。
基于poll的服务器程序
下面我们将开发一个基于poll的服务器程序。这个程序可以同时处理多个客户端连接,并且一旦有客户端数据接收后,同步地回显回去。这已经是一个颇具高并发处理的服务器原型了,再加上后面讲到的非阻塞I/O和多线程等技术,基本上就是可使用的准生产级别了。
所以,让我们打起精神,一起来看这个程序。
#define INIT_SIZE 128
int main(int argc, char **argv) {
int listen_fd, connected_fd;
int ready_number;
ssize_t n;
char buf[MAXLINE];
struct sockaddr_in client_addr;
listen_fd = tcp_server_listen(SERV_PORT);
//初始化pollfd数组,这个数组的第一个元素是listen_fd,其余的用来记录将要连接的connect_fd
struct pollfd event_set[INIT_SIZE];
event_set[0].fd = listen_fd;
event_set[0].events = POLLRDNORM;
// 用-1表示这个数组位置还没有被占用
int i;
for (i = 1; i < INIT_SIZE; i++) {
event_set[i].fd = -1;
}
for (;;) {
if ((ready_number = poll(event_set, INIT_SIZE, -1)) < 0) {
error(1, errno, "poll failed ");
}
if (event_set[0].revents & POLLRDNORM) {
socklen_t client_len = sizeof(client_addr);
connected_fd = accept(listen_fd, (struct sockaddr *) &client_addr, &client_len);
//找到一个可以记录该连接套接字的位置
for (i = 1; i < INIT_SIZE; i++) {
if (event_set[i].fd < 0) {
event_set[i].fd = connected_fd;
event_set[i].events = POLLRDNORM;
break;
}
}
if (i == INIT_SIZE) {
error(1, errno, "can not hold so many clients");
}
if (--ready_number <= 0)
continue;
}
for (i = 1; i < INIT_SIZE; i++) {
int socket_fd;
if ((socket_fd = event_set[i].fd) < 0)
continue;
if (event_set[i].revents & (POLLRDNORM | POLLERR)) {
if ((n = read(socket_fd, buf, MAXLINE)) > 0) {
if (write(socket_fd, buf, n) < 0) {
error(1, errno, "write error");
}
} else if (n == 0 || errno == ECONNRESET) {
close(socket_fd);
event_set[i].fd = -1;
} else {
error(1, errno, "read error");
}
if (--ready_number <= 0)
break;
}
}
}
}
当然,一开始需要创建一个监听套接字,并绑定在本地的地址和端口上,这在第10行调用tcp_server_listen函数来完成。
在第13行,我初始化了一个pollfd数组,并命名为event_set,之所以叫这个名字,是引用pollfd数组确实代表了检测的事件集合。这里数组的大小固定为INIT_SIZE,这在实际的生产环境肯定是需要改进的。
我在前面讲过,监听套接字上如果有连接建立完成,也是可以通过 I/O事件复用来检测到的。在第14-15行,将监听套接字listen_fd和对应的POLLRDNORM事件加入到event_set里,表示我们期望系统内核检测监听套接字上的连接建立完成事件。
在前面介绍poll函数时,我们提到过,如果对应pollfd里的文件描述字fd为负数,poll函数将会忽略这个pollfd,所以我们在第18-21行将event_set数组里其他没有用到的fd统统设置为-1。这里-1也表示了当前pollfd没有被使用的意思。
下面我们的程序进入一个无限循环,在这个循环体内,第24行调用poll函数来进行事件检测。poll函数传入的参数为event_set数组,数组大小INIT_SIZE和-1。这里之所以传入INIT_SIZE,是因为poll函数已经能保证可以自动忽略fd为-1的pollfd,否则我们每次都需要计算一下event_size里真正需要被检测的元素大小;timeout设置为-1,表示在I/O事件发生之前poll调用一直阻塞。
如果系统内核检测到监听套接字上的连接建立事件,就进入到第28行的判断分支。我们看到,使用了如event_set[0].revent来和对应的事件类型进行位与操作,这个技巧大家一定要记住,这是因为event都是通过二进制位来进行记录的,位与操作是和对应的二进制位进行操作,一个文件描述字是可以对应到多个事件类型的。
在这个分支里,调用accept函数获取了连接描述字。接下来,33-38行做了一件事,就是把连接描述字connect_fd也加入到event_set里,而且说明了我们感兴趣的事件类型为POLLRDNORM,也就是套接字上有数据可以读。在这里,我们从数组里查找一个没有没占用的位置,也就是fd为-1的位置,然后把fd设置为新的连接套接字connect_fd。
如果在数组里找不到这样一个位置,说明我们的event_set已经被很多连接充满了,没有办法接收更多的连接了,这就是第41-42行所做的事情。
第45-46行是一个加速优化能力,因为poll返回的一个整数,说明了这次I/O事件描述符的个数,如果处理完监听套接字之后,就已经完成了这次I/O复用所要处理的事情,那么我们就可以跳过后面的处理,再次进入poll调用。
接下来的循环处理是查看event_set里面其他的事件,也就是已连接套接字的可读事件。这是通过遍历event_set数组来完成的。
如果数组里的pollfd的fd为-1,说明这个pollfd没有递交有效的检测,直接跳过;来到第53行,通过检测revents的事件类型是POLLRDNORM或者POLLERR,我们可以进行读操作。在第54行,读取数据正常之后,再通过write操作回显给客户端;在第58行,如果读到EOF或者是连接重置,则关闭这个连接,并且把event_set对应的pollfd重置;第61行读取数据失败。
和前面的优化加速处理一样,第65-66行是判断如果事件已经被完全处理完之后,直接跳过对event_set的循环处理,再次来到poll调用。
实验
我们启动这个服务器程序,然后通过telnet连接到这个服务器程序。为了检验这个服务器程序的I/O复用能力,我们可以多开几个telnet客户端,并且在屏幕上输入各种字符串。
客户端1:
$telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
a
a
aaaaaaaaaaa
aaaaaaaaaaa
afafasfa
afafasfa
fbaa
fbaa
^]
telnet> quit
Connection closed.
客户端2:
telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
b
b
bbbbbbb
bbbbbbb
bbbbbbb
bbbbbbb
^]
telnet> quit
Connection closed.
可以看到,这两个客户端互不影响,每个客户端输入的字符很快会被回显到客户端屏幕上。一个客户端断开连接,也不会影响到其他客户端。
总结
poll是另一种在各种UNIX系统上被广泛支持的I/O多路复用技术,虽然名声没有select那么响,能力一点不比select差,而且因为可以突破select文件描述符的个数限制,在高并发的场景下尤其占优势。
21 非阻塞IO:提升性能的加速器
非阻塞I/O配合I/O多路复用,是高性能网络编程中的常见技术。
阻塞 VS 非阻塞
当应用程序调用阻塞I/O完成某个操作时,应用程序会被挂起,等待内核完成操作,感觉上应用程序像是被“阻塞”了一样。实际上,内核所做的事情是将CPU时间切换给其他有需要的进程,网络应用程序在这种情况下就会得不到CPU时间做该做的事情。
非阻塞I/O则不然,当应用程序调用非阻塞I/O完成某个操作时,内核立即返回,不会把CPU时间切换给其他进程,应用程序在返回后,可以得到足够的CPU时间继续完成其他事情。
如果拿去书店买书举例子,阻塞I/O对应什么场景呢? 你去了书店,告诉老板(内核)你想要某本书,然后你就一直在那里等着,直到书店老板翻箱倒柜找到你想要的书,有可能还要帮你联系全城其它分店。注意,这个过程中你一直滞留在书店等待老板的回复,好像在书店老板这里”阻塞”住了。
那么非阻塞I/O呢?你去了书店,问老板有没你心仪的那本书,老板查了下电脑,告诉你没有,你就悻悻离开了。一周以后,你又来这个书店,再问这个老板,老板一查,有了,于是你买了这本书。注意,这个过程中,你没有被阻塞,而是在不断轮询。
但轮询的效率太低了,于是你向老板提议:“老板,到货给我打电话吧,我再来付钱取书。”这就是前面讲到的I/O多路复用。
再进一步,你连去书店取书也想省了,得了,让老板代劳吧,你留下地址,付了书费,让老板到货时寄给你,你直接在家里拿到就可以看了。这就是我们将会在第30讲中讲到的异步I/O。
这几个I/O模型,再加上进程、线程模型,构成了整个网络编程的知识核心。
按照使用场景,**非阻塞I/O可以被用到读操作、写操作、接收连接操作和发起连接操作上。
**
非阻塞I/O
读操作
如果套接字对应的接收缓冲区没有数据可读,在非阻塞情况下read调用会立即返回,一般返回EWOULDBLOCK或EAGAIN出错信息。在这种情况下,出错信息是需要小心处理,比如后面再次调用read操作,而不是直接作为错误直接返回。这就好像去书店买书没买到离开一样,需要不断进行又一次轮询处理。
写操作
不知道你有没有注意到,在阻塞I/O情况下,write函数返回的字节数,和输入的参数总是一样的。如果返回值总是和输入的数据大小一样,write等写入函数还需要定义返回值吗?我不知道你是不是和我一样,刚接触到这一部分知识的时候有这种困惑。
这里就要引出我们所说的非阻塞I/O。在非阻塞I/O的情况下,如果套接字的发送缓冲区已达到了极限,不能容纳更多的字节,那么操作系统内核会 尽最大可能 从应用程序拷贝数据到发送缓冲区中,并立即从write等函数调用中返回。可想而知,在拷贝动作发生的瞬间,有可能一个字符也没拷贝,有可能所有请求字符都被拷贝完成,那么这个时候就需要返回一个数值,告诉应用程序到底有多少数据被成功拷贝到了发送缓冲区中,应用程序需要再次调用write函数,以输出未完成拷贝的字节。
write等函数是可以同时作用到阻塞I/O和非阻塞I/O上的,为了复用一个函数,处理非阻塞和阻塞I/O多种情况,设计出了写入返回值,并用这个返回值表示实际写入的数据大小。
也就是说,非阻塞I/O和阻塞I/O处理的方式是不一样的。
非阻塞I/O需要这样:拷贝→返回→再拷贝→再返回。
而阻塞I/O需要这样:拷贝→直到所有数据拷贝至发送缓冲区完成→返回。
不过在实战中,你可以不用区别阻塞和非阻塞I/O,使用循环的方式来写入数据就好了。只不过在阻塞I/O的情况下,循环只执行一次就结束了。
writen函数的实现。
/* 向文件描述符fd写入n字节数 */
ssize_t writen(int fd, const void * data, size_t n)
{
size_t nleft;
ssize_t nwritten;
const char *ptr;
ptr = data;
nleft = n;
//如果还有数据没被拷贝完成,就一直循环
while (nleft > 0) {
if ( (nwritten = write(fd, ptr, nleft)) <= 0) {
/* 这里EAGAIN是非阻塞non-blocking情况下,通知我们再次调用write() */
if (nwritten < 0 && errno == EAGAIN)
nwritten = 0;
else
return -1; /* 出错退出 */
}
/* 指针增大,剩下字节数变小*/
nleft -= nwritten;
ptr += nwritten;
}
return n;
}
下面我通过一张表来总结一下read和write在阻塞模式和非阻塞模式下的不同行为特性:
关于read和write还有几个结论,你需要把握住:
- read总是在接收缓冲区有数据时就立即返回,不是等到应用程序给定的数据充满才返回。当接收缓冲区为空时,阻塞模式会等待,非阻塞模式立即返回-1,并有EWOULDBLOCK或EAGAIN错误。
- 和read不同,阻塞模式下,write只有在发送缓冲区足以容纳应用程序的输出字节时才返回;而非阻塞模式下,则是能写入多少就写入多少,并返回实际写入的字节数。
- 阻塞模式下的write有个特例, 就是对方主动关闭了套接字,这个时候write调用会立即返回,并通过返回值告诉应用程序实际写入的字节数,如果再次对这样的套接字进行write操作,就会返回失败。失败是通过返回值-1来通知到应用程序的。
accept
当accept和I/O多路复用select、poll等一起配合使用时,如果在监听套接字上触发事件,说明有连接建立完成,此时调用accept肯定可以返回已连接套接字。这样看来,似乎把监听套接字设置为非阻塞,没有任何好处。
为了说明这个问题,我们构建一个客户端程序,其中最关键的是,一旦连接建立,设置SO_LINGER套接字选项,把l_onoff标志设置为1,把l_linger时间设置为0。这样,连接被关闭时,TCP套接字上将会发送一个RST。
struct linger ling;
ling.l_onoff = 1;
ling.l_linger = 0;
setsockopt(socket_fd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
close(socket_fd);
服务器端使用select I/O多路复用,不过,监听套接字仍然是blocking的。如果监听套接字上有事件发生,休眠5秒,以便模拟高并发场景下的情形。
if (FD_ISSET(listen_fd, &readset)) {
printf("listening socket readable\n");
sleep(5);
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listen_fd, (struct sockaddr *) &ss, &slen);
这里的休眠时间非常关键,这样,在监听套接字上有可读事件发生时,并没有马上调用accept。由于客户端发生了RST分节,该连接被接收端内核从自己的已完成队列中删除了,此时再调用accept,由于没有已完成连接(假设没有其他已完成连接),accept一直阻塞,更为严重的是,该线程再也没有机会对其他I/O事件进行分发,相当于该服务器无法对其他I/O进行服务。
如果我们将监听套接字设为非阻塞,上述的情形就不会再发生。只不过对于accept的返回值,需要正确地处理各种看似异常的错误,例如忽略EWOULDBLOCK、EAGAIN等。
这个例子给我们的启发是,一定要将监听套接字设置为非阻塞的,尽管这里休眠时间5秒有点夸张,但是在极端情况下处理不当的服务器程序是有可能碰到例子所阐述的情况,为了让服务器程序在极端情况下工作正常,这点工作还是非常值得的。
connect
在非阻塞TCP套接字上调用connect函数,会立即返回一个EINPROGRESS错误。TCP三次握手会正常进行,应用程序可以继续做其他初始化的事情。当该连接建立成功或者失败时,通过I/O多路复用select、poll等可以进行连接的状态检测。
非阻塞I/O + select多路复用
我在这里给出了一个非阻塞I/O搭配select多路复用的例子。
#define MAX_LINE 1024
#define FD_INIT_SIZE 128
char rot13_char(char c) {
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}
//数据缓冲区
struct Buffer {
int connect_fd; //连接字
char buffer[MAX_LINE]; //实际缓冲
size_t writeIndex; //缓冲写入位置
size_t readIndex; //缓冲读取位置
int readable; //是否可以读
};
struct Buffer *alloc_Buffer() {
struct Buffer *buffer = malloc(sizeof(struct Buffer));
if (!buffer)
return NULL;
buffer->connect_fd = 0;
buffer->writeIndex = buffer->readIndex = buffer->readable = 0;
return buffer;
}
void free_Buffer(struct Buffer *buffer) {
free(buffer);
}
int onSocketRead(int fd, struct Buffer *buffer) {
char buf[1024];
int i;
ssize_t result;
while (1) {
result = recv(fd, buf, sizeof(buf), 0);
if (result <= 0)
break;
for (i = 0; i < result; ++i) {
if (buffer->writeIndex < sizeof(buffer->buffer))
buffer->buffer[buffer->writeIndex++] = rot13_char(buf[i]);
if (buf[i] == '\n') {
buffer->readable = 1; //缓冲区可以读
}
}
}
if (result == 0) {
return 1;
} else if (result < 0) {
if (errno == EAGAIN)
return 0;
return -1;
}
return 0;
}
int onSocketWrite(int fd, struct Buffer *buffer) {
while (buffer->readIndex < buffer->writeIndex) {
ssize_t result = send(fd, buffer->buffer + buffer->readIndex, buffer->writeIndex - buffer->readIndex, 0);
if (result < 0) {
if (errno == EAGAIN)
return 0;
return -1;
}
buffer->readIndex += result;
}
if (buffer->readIndex == buffer->writeIndex)
buffer->readIndex = buffer->writeIndex = 0;
buffer->readable = 0;
return 0;
}
int main(int argc, char **argv) {
int listen_fd;
int i, maxfd;
struct Buffer *buffer[FD_INIT_SIZE];
for (i = 0; i < FD_INIT_SIZE; ++i) {
buffer[i] = alloc_Buffer();
}
listen_fd = tcp_nonblocking_server_listen(SERV_PORT);
fd_set readset, writeset, exset;
FD_ZERO(&readset);
FD_ZERO(&writeset);
FD_ZERO(&exset);
while (1) {
maxfd = listen_fd;
FD_ZERO(&readset);
FD_ZERO(&writeset);
FD_ZERO(&exset);
// listener加入readset
FD_SET(listen_fd, &readset);
for (i = 0; i < FD_INIT_SIZE; ++i) {
if (buffer[i]->connect_fd > 0) {
if (buffer[i]->connect_fd > maxfd)
maxfd = buffer[i]->connect_fd;
FD_SET(buffer[i]->connect_fd, &readset);
if (buffer[i]->readable) {
FD_SET(buffer[i]->connect_fd, &writeset);
}
}
}
if (select(maxfd + 1, &readset, &writeset, &exset, NULL) < 0) {
error(1, errno, "select error");
}
if (FD_ISSET(listen_fd, &readset)) {
printf("listening socket readable\n");
sleep(5);
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listen_fd, (struct sockaddr *) &ss, &slen);
if (fd < 0) {
error(1, errno, "accept failed");
} else if (fd > FD_INIT_SIZE) {
error(1, 0, "too many connections");
close(fd);
} else {
make_nonblocking(fd);
if (buffer[fd]->connect_fd == 0) {
buffer[fd]->connect_fd = fd;
} else {
error(1, 0, "too many connections");
}
}
}
for (i = 0; i < maxfd + 1; ++i) {
int r = 0;
if (i == listen_fd)
continue;
if (FD_ISSET(i, &readset)) {
r = onSocketRead(i, buffer[i]);
}
if (r == 0 && FD_ISSET(i, &writeset)) {
r = onSocketWrite(i, buffer[i]);
}
if (r) {
buffer[i]->connect_fd = 0;
close(i);
}
}
}
}
第93行,调用fcntl将监听套接字设置为非阻塞。
fcntl(fd, F_SETFL, O_NONBLOCK);
第121行调用select进行I/O事件分发处理。
131-142行在处理新的连接套接字,注意这里也把连接套接字设置为非阻塞的。
151-156行在处理连接套接字上的I/O读写事件,这里我们抽象了一个Buffer对象,Buffer对象使用了readIndex和writeIndex分别表示当前缓冲的读写位置。
实验
启动该服务器:
$./nonblockingserver
使用多个telnet客户端连接该服务器,可以验证交互正常。
$telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
fasfasfasf
snfsnfsnfs
总结
非阻塞I/O可以使用在read、write、accept、connect等多种不同的场景,在非阻塞I/O下,使用轮询的方式引起CPU占用率高,所以一般将非阻塞I/O和I/O多路复用技术select、poll等搭配使用,在非阻塞I/O事件发生时,再调用对应事件的处理函数。这种方式,极大地提高了程序的健壮性和稳定性,是Linux下高性能网络编程的首选。
22 Linux利器:epoll
这里有放置了一张图,这张图来自The Linux Programming Interface(No Starch Press)。这张图直观地为我们展示了select、poll、epoll几种不同的I/O复用技术在面对不同文件描述符大小时的表现差异。
从图中可以明显地看到,epoll的性能是最好的,即使在多达10000个文件描述的情况下,其性能的下降和有10个文件描述符的情况相比,差别也不是很大。而随着文件描述符的增大,常规的select和poll方法性能逐渐变得很差。
那么,epoll究竟使用了什么样的“魔法”,取得了如此令人惊讶的效果呢?
epoll的用法
在分析对比epoll、poll和select几种技术之前,我们先看一下怎么使用epoll来完成一个服务器程序。
epoll可以说是和poll非常相似的一种I/O多路复用技术,有些人将epoll归为异步I/O,我觉得这是不正确的。本质上epoll还是一种I/O多路复用技术, epoll通过监控注册的多个描述字,来进行I/O事件的分发处理。不同于poll的是,epoll不仅提供了默认的level-triggered(条件触发)机制,还提供了性能更为强劲的edge-triggered(边缘触发)机制。
使用epoll进行网络程序的编写,需要三个步骤,分别是epoll_create,epoll_ctl和epoll_wait。接下来我对这几个API详细展开讲一下。
epoll_create
int epoll_create(int size);
int epoll_create1(int flags);
返回值: 若成功返回一个大于0的值,表示epoll实例;若返回-1表示出错
epoll_create()方法创建了一个epoll实例,从Linux 2.6.8开始,参数size被自动忽略,但是该值仍需要一个大于0的整数。这个epoll实例被用来调用epoll_ctl和epoll_wait,如果这个epoll实例不再需要,比如服务器正常关机,需要调用close()方法释放epoll实例,这样系统内核可以回收epoll实例所分配使用的内核资源。
关于这个参数size,在一开始的epoll_create实现中,是用来告知内核期望监控的文件描述字大小,然后内核使用这部分的信息来初始化内核数据结构,在新的实现中,这个参数不再被需要,因为内核可以动态分配需要的内核数据结构。我们只需要注意,每次将size设置成一个大于0的整数就可以了。
epoll_create1()的用法和epoll_create()基本一致,如果epoll_create1()的输入flags为0,则和epoll_create()一样,内核自动忽略。可以增加如EPOLL_CLOEXEC的额外选项,如果你有兴趣的话,可以研究一下这个选项有什么意义。
epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
返回值: 若成功返回0;若返回-1表示出错
在创建完epoll实例之后,可以通过调用epoll_ctl往这个epoll实例增加或删除监控的事件。函数epll_ctl有4个入口参数。
第一个参数epfd是刚刚调用epoll_create创建的epoll实例描述字,可以简单理解成是epoll句柄。
第二个参数表示增加还是删除一个监控事件,它有三个选项可供选择:
- EPOLL_CTL_ADD: 向epoll实例注册文件描述符对应的事件;
- EPOLL_CTL_DEL:向epoll实例删除文件描述符对应的事件;
- EPOLL_CTL_MOD: 修改文件描述符对应的事件。
第三个参数是注册的事件的文件描述符,比如一个监听套接字。
第四个参数表示的是注册的事件类型,并且可以在这个结构体里设置用户需要的数据,其中最为常见的是使用联合结构里的fd字段,表示事件所对应的文件描述符。
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
我们在前面介绍poll的时候已经接触过基于mask的事件类型了,这里epoll仍旧使用了同样的机制,我们重点看一下这几种事件类型:
- EPOLLIN:表示对应的文件描述字可以读;
- EPOLLOUT:表示对应的文件描述字可以写;
- EPOLLRDHUP:表示套接字的一端已经关闭,或者半关闭;
- EPOLLHUP:表示对应的文件描述字被挂起;
- EPOLLET:设置为edge-triggered,默认为level-triggered。
epoll_wait
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
返回值: 成功返回的是一个大于0的数,表示事件的个数;返回0表示的是超时时间到;若出错返回-1.
epoll_wait()函数类似之前的poll和select函数,调用者进程被挂起,在等待内核I/O事件的分发。
这个函数的第一个参数是epoll实例描述字,也就是epoll句柄。
第二个参数返回给用户空间需要处理的I/O事件,这是一个数组,数组的大小由epoll_wait的返回值决定,这个数组的每个元素都是一个需要待处理的I/O事件,其中events表示具体的事件类型,事件类型取值和epoll_ctl可设置的值一样,这个epoll_event结构体里的data值就是在epoll_ctl那里设置的data,也就是用户空间和内核空间调用时需要的数据。
第三个参数是一个大于0的整数,表示epoll_wait可以返回的最大事件值。
第四个参数是epoll_wait阻塞调用的超时值,如果这个值设置为-1,表示不超时;如果设置为0则立即返回,即使没有任何I/O事件发生。
epoll例子
代码解析
下面我们把原先基于poll的服务器端程序改造成基于epoll的:
#include "lib/common.h"
#define MAXEVENTS 128
char rot13_char(char c) {
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}
int main(int argc, char **argv) {
int listen_fd, socket_fd;
int n, i;
int efd;
struct epoll_event event;
struct epoll_event *events;
listen_fd = tcp_nonblocking_server_listen(SERV_PORT);
efd = epoll_create1(0);
if (efd == -1) {
error(1, errno, "epoll create failed");
}
event.data.fd = listen_fd;
event.events = EPOLLIN | EPOLLET;
if (epoll_ctl(efd, EPOLL_CTL_ADD, listen_fd, &event) == -1) {
error(1, errno, "epoll_ctl add listen fd failed");
}
/* Buffer where events are returned */
events = calloc(MAXEVENTS, sizeof(event));
while (1) {
n = epoll_wait(efd, events, MAXEVENTS, -1);
printf("epoll_wait wakeup\n");
for (i = 0; i < n; i++) {
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN))) {
fprintf(stderr, "epoll error\n");
close(events[i].data.fd);
continue;
} else if (listen_fd == events[i].data.fd) {
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listen_fd, (struct sockaddr *) &ss, &slen);
if (fd < 0) {
error(1, errno, "accept failed");
} else {
make_nonblocking(fd);
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET; //edge-triggered
if (epoll_ctl(efd, EPOLL_CTL_ADD, fd, &event) == -1) {
error(1, errno, "epoll_ctl add connection fd failed");
}
}
continue;
} else {
socket_fd = events[i].data.fd;
printf("get event on socket fd == %d \n", socket_fd);
while (1) {
char buf[512];
if ((n = read(socket_fd, buf, sizeof(buf))) < 0) {
if (errno != EAGAIN) {
error(1, errno, "read error");
close(socket_fd);
}
break;
} else if (n == 0) {
close(socket_fd);
break;
} else {
for (i = 0; i < n; ++i) {
buf[i] = rot13_char(buf[i]);
}
if (write(socket_fd, buf, n) < 0) {
error(1, errno, "write error");
}
}
}
}
}
}
free(events);
close(listen_fd);
}
程序的第23行调用epoll_create0创建了一个epoll实例。
28-32行,调用epoll_ctl将监听套接字对应的I/O事件进行了注册,这样在有新的连接建立之后,就可以感知到。注意这里使用的是edge-triggered(边缘触发)。
35行为返回的event数组分配了内存。
主循环调用epoll_wait函数分发I/O事件,当epoll_wait成功返回时,通过遍历返回的event数组,就直接可以知道发生的I/O事件。
第41-46行判断了各种错误情况。
第47-61行是监听套接字上有事件发生的情况下,调用accept获取已建立连接,并将该连接设置为非阻塞,再调用epoll_ctl把已连接套接字对应的可读事件注册到epoll实例中。这里我们使用了event_data里面的fd字段,将连接套接字存储其中。
第63-84行,处理了已连接套接字上的可读事件,读取字节流,编码后再回应给客户端。
实验
启动该服务器:
$./epoll01
epoll_wait wakeup
epoll_wait wakeup
epoll_wait wakeup
get event on socket fd == 6
epoll_wait wakeup
get event on socket fd == 5
epoll_wait wakeup
get event on socket fd == 5
epoll_wait wakeup
get event on socket fd == 6
epoll_wait wakeup
get event on socket fd == 6
epoll_wait wakeup
get event on socket fd == 6
epoll_wait wakeup
get event on socket fd == 5
再启动几个telnet客户端,可以看到有连接建立情况下,epoll_wait迅速从挂起状态结束;并且套接字上有数据可读时,epoll_wait也迅速结束挂起状态,这时候通过read可以读取套接字接收缓冲区上的数据。
$telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
fasfsafas
snfsfnsnf
^]
telnet> quit
Connection closed.
edge-triggered VS level-triggered
对于edge-triggered和level-triggered, 官方的说法是一个是边缘触发,一个是条件触发。也有文章从电子脉冲角度来解读的,总体上,给初学者的带来的感受是理解上有困难。
这里有两个程序,我们用这个程序来说明一下这两者之间的不同。
在这两个程序里,即使已连接套接字上有数据可读,我们也不调用read函数去读,只是简单地打印出一句话。
第一个程序我们设置为edge-triggered,即边缘触发。开启这个服务器程序,用telnet连接上,输入一些字符,我们看到,服务器端只从epoll_wait中苏醒过一次,就是第一次有数据可读的时候。
$./epoll02
epoll_wait wakeup
epoll_wait wakeup
get event on socket fd == 5
$telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
asfafas
第二个程序我们设置为level-triggered,即条件触发。然后按照同样的步骤来一次,观察服务器端,这一次我们可以看到,服务器端不断地从epoll_wait中苏醒,告诉我们有数据需要读取。
$./epoll03
epoll_wait wakeup
epoll_wait wakeup
get event on socket fd == 5
epoll_wait wakeup
get event on socket fd == 5
epoll_wait wakeup
get event on socket fd == 5
epoll_wait wakeup
get event on socket fd == 5
...
这就是两者的区别,条件触发的意思是只要满足事件的条件,比如有数据需要读,就一直不断地把这个事件传递给用户;而边缘触发的意思是只有第一次满足条件的时候才触发,之后就不会再传递同样的事件了。
一般我们认为,边缘触发的效率比条件触发的效率要高,这一点也是epoll的杀手锏之一。
epoll的历史
早在Linux实现epoll之前,Windows系统就已经在1994年引入了IOCP,这是一个异步I/O模型,用来支持高并发的网络I/O,而著名的FreeBSD在2000年引入了Kqueue——一个I/O事件分发框架。
Linux在2002年引入了epoll,不过相关工作的讨论和设计早在2000年就开始了。如果你感兴趣的话,可以 [http://lkml.iu.edu/hypermail/linux/kernel/0010.3/0003.html](http:// <a href=)”>点击这里看一下里面的讨论。
为什么Linux不把FreeBSD的kqueue直接移植过来,而是另辟蹊径创立了epoll呢?
让我们先看下kqueue的用法,kqueue也需要先创建一个名叫kqueue的对象,然后通过这个对象,调用kevent函数增加感兴趣的事件,同时,也是通过这个kevent函数来等待事件的发生。
int kqueue(void);
int kevent(int kq, const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents,
const struct timespec *timeout);
void EV_SET(struct kevent *kev, uintptr_t ident, short filter,
u_short flags, u_int fflags, intptr_t data, void *udata);
struct kevent {
uintptr_t ident; /* identifier (e.g., file descriptor) */
short filter; /* filter type (e.g., EVFILT_READ) */
u_short flags; /* action flags (e.g., EV_ADD) */
u_int fflags; /* filter-specific flags */
intptr_t data; /* filter-specific data */
void *udata; /* opaque user data */
};
Linus在他最初的设想里,提到了这么一句话,也就是说他觉得类似select或poll的数组方式是可以的,而队列方式则是不可取的。
So sticky arrays of events are good, while queues are bad. Let’s take that as one of the fundamentals.
在最初的设计里,Linus等于把keque里面的kevent函数拆分了两个部分,一部分负责事件绑定,通过bind_event函数来实现;另一部分负责事件等待,通过get_events来实现。
struct event {
unsigned long id; /* file descriptor ID the event is on */
unsigned long event; /* bitmask of active events */
};
int bind_event(int fd, struct event *event);
int get_events(struct event * event_array, int maxnr, struct timeval *tmout);
和最终的epoll实现相比,前者类似epoll_ctl,后者类似epoll_wait,不过原始的设计里没有考虑到创建epoll句柄,在最终的实现里增加了epoll_create,支持了epoll句柄的创建。
2002年,epoll最终在Linux 2.5.44中首次出现,在2.6中趋于稳定,为Linux的高性能网络I/O画上了一段句号。
总结
Linux中epoll的出现,为高性能网络编程补齐了最后一块拼图。epoll通过改进的接口设计,避免了用户态-内核态频繁的数据拷贝,大大提高了系统性能。在使用epoll的时候,我们一定要理解条件触发和边缘触发两种模式。条件触发的意思是只要满足事件的条件,比如有数据需要读,就一直不断地把这个事件传递给用户;而边缘触发的意思是只有第一次满足条件的时候才触发,之后就不会再传递同样的事件了。
23 C10K问题: 高并发模型设计。
C10K问题
C10K问题是这样的:如何在一台物理机上同时服务10000个用户?这里C表示并发,10K等于10000。得益于操作系统、编程语言的发展,在现在的条件下,普通用户使用Java Netty、Libevent等框架或库就可以轻轻松松写出支持并发超过10000的服务器端程序,甚至于经过优化之后可以达到十万,乃至百万的并发,但在二十年前,突破C10K问题可费了不少的心思,是一个了不起的突破。
C10K问题是由一个叫Dan Kegel的工程师提出并总结归纳的,你可以通过访问 这个页面 来获得最新有关这方面的信息。
操作系统层面
C10K问题本质上是一个操作系统问题,要在一台主机上同时支持1万个连接,意味着什么呢? 需要考虑哪些方面?
文件句柄
首先,通过前面的介绍,我们知道每个客户连接都代表一个文件描述符,一旦文件描述符不够用了,新的连接就会被放弃,产生如下的错误:
Socket/File:Can't open so many files
在Linux下,单个进程打开的文件句柄数是有限制的,没有经过修改的值一般都是1024。
$ulimit -n
1024
这意味着最多可以服务的连接数上限只能是1024。不过,我们可以对这个值进行修改,比如用 root 权限修改 /etc/sysctl.conf 文件,使得系统可以支持10000个描述符上限。
fs.file-max = 10000
net.ipv4.ip_conntrack_max = 10000
net.ipv4.netfilter.ip_conntrack_max = 10000
系统内存
每个TCP连接占用的资源可不止一个连接套接字这么简单,在前面的章节中,我们多少接触到了类似发送缓冲区、接收缓冲区这些概念。每个TCP连接都需要占用一定的发送缓冲区和接收缓冲区。
这里有一段shell代码,分别显示了在Linux 4.4.0下发送缓冲区和接收缓冲区的值。
$cat /proc/sys/net/ipv4/tcp_wmem
4096 16384 4194304
$ cat /proc/sys/net/ipv4/tcp_rmem
4096 87380 6291456
这三个值分别表示了最小分配值、默认分配值和最大分配值。按照默认分配值计算,一万个连接需要的内存消耗为:
发送缓冲区: 16384*10000 = 160M bytes
接收缓冲区: 87380*10000 = 880M bytes
当然,我们的应用程序本身也需要一定的缓冲区来进行数据的收发,为了方便,我们假设每个连接需要128K的缓冲区,那么1万个链接就需要大约1.2G的应用层缓冲。
这样,我们可以得出大致的结论,支持1万个并发连接,内存并不是一个巨大的瓶颈。
网络带宽
假设1万个连接,每个连接每秒传输大约1KB的数据,那么带宽需要 10000 x 1KB/s x8 = 80Mbps。这在今天的动辄万兆网卡的时代简直小菜一碟。
C10K问题解决之道
通过前面我们对操作系统层面的资源分析,可以得出一个结论,在系统资源层面,C10K问题是可以解决的。
但是,能解决并不意味着可以很好地解决。我们知道,在网络编程中,涉及到频繁的用户态-内核态数据拷贝,设计不够好的程序可能在低并发的情况下工作良好,一旦到了高并发情形,其性能可能呈现出指数级别的损失。
举一个例子,如果没有考虑好C10K问题,一个基于select的经典程序可能在一台服务器上可以很好处理1000的并发用户,但是在性能2倍的服务器上,却往往并不能很好地处理2000的并发用户。
要想解决C10K问题,就需要从两个层面上来统筹考虑。
第一个层面,应用程序如何和操作系统配合,感知I/O事件发生,并调度处理在上万个套接字上的 I/O操作?前面讲过的阻塞I/O、非阻塞I/O讨论的就是这方面的问题。
第二个层面,应用程序如何分配进程、线程资源来服务上万个连接?这在接下来会详细讨论。
这两个层面的组合就形成了解决C10K问题的几种解法方案,下面我们一起来看。
阻塞I/O + 进程
这种方式最为简单直接,每个连接通过fork派生一个子进程进行处理,因为一个独立的子进程负责处理了该连接所有的I/O,所以即便是阻塞I/O,多个连接之间也不会互相影响。
这个方法虽然简单,但是效率不高,扩展性差,资源占用率高。
下面的伪代码描述了使用阻塞I/O,为每个连接fork一个进程的做法:
do{
accept connections
fork for conneced connection fd
process_run(fd)
}
虽然这个方式比较传统, 但是可以很好地帮我们理解父子进程、僵尸进程等,我们将在下一讲中详细讲一下如何使用这个技术设计一个服务器端程序。
阻塞I/O + 线程
进程模型占用的资源太大,幸运的是,还有一种轻量级的资源模型,这就是线程。
通过为每个连接调用pthread_create创建一个单独的线程,也可以达到上面使用进程的效果。
do{
accept connections
pthread_create for conneced connection fd
thread_run(fd)
}while(true)
因为线程的创建是比较消耗资源的,况且不是每个连接在每个时刻都需要服务,因此,我们可以预先通过创建一个线程池,并在多个连接中复用线程池来获得某种效率上的提升。
create thread pool
do{
accept connections
get connection fd
push_queue(fd)
}while(true)
非阻塞I/O + readiness notification + 单线程
应用程序其实可以采取轮询的方式来对保存的套接字集合进行挨个询问,从而找出需要进行I/O处理的套接字,像给出的伪码一样,其中is_readble和is_writeable可以通过对套接字调用read或write操作来判断。
for fd in fdset{
if(is_readable(fd) == true){
handle_read(fd)
}else if(is_writeable(fd)==true){
handle_write(fd)
}
}
但这个方法有一个问题,如果这个fdset有一万个之多,每次循环判断都会消耗大量的CPU时间,而且极有可能在一个循环之内,没有任何一个套接字准备好可读,或者可写。
既然这样,CPU的消耗太大,那么干脆让操作系统来告诉我们哪个套接字可以读,哪个套接字可以写。在这个结果发生之前,我们把CPU的控制权交出去,让操作系统来把宝贵的CPU时间调度给那些需要的进程,这就是select、poll这样的I/O分发技术。
于是,程序就长成了这样:
do {
poller.dispatch()
for fd in registered_fdset{
if(is_readable(fd) == true){
handle_read(fd)
}else if(is_writeable(fd)==true){
handle_write(fd)
}
}while(ture)
第27讲中,我将会讨论这样的技术实现。
但是,这样的方法需要每次dispatch之后,对所有注册的套接字进行逐个排查,效率并不是最高的。如果dispatch调用返回之后只提供有 I/O事件或者I/O变化的套接字,这样排查的效率不就高很多了么?这就是前面我们讲到的epoll设计。
于是,基于epoll的程序就长成了这样:
do {
poller.dispatch()
for fd_event in active_event_set{
if(is_readable_event(fd_event) == true){
handle_read(fd_event)
}else if(is_writeable_event(fd_event)==true){
handle_write(fd_event)
}
}while(ture)
Linux是互联网的基石,epoll也就成为了解决C10K问题的钥匙。FreeBSD上的kqueue,Windows上的IOCP,Solaris上的/dev/poll,这些不同的操作系统提供的功能都是为了解决C10K问题。
非阻塞I/O + readiness notification +多线程
前面的做法是所有的I/O事件都在一个线程里分发,如果我们把线程引入进来,可以利用现代CPU多核的能力,让每个核都可以作为一个I/O分发器进行I/O事件的分发。
这就是所谓的主从reactor模式。基于epoll/poll/select的I/O事件分发器可以叫做reactor,也可以叫做事件驱动,或者事件轮询(eventloop)。
我没有把基于select/poll的所谓“level triggered”通知机制和基于epoll的“edge triggered”通知机制分开(C10K问题总结里是分开的),我觉得这只是reactor机制的实现高效性问题,而不是编程模式的巨大区别。
从27讲开始,我们就会引入reactor模式,并使用一个自己编写的简单reactor框架来逐渐掌握它。
异步I/O+ 多线程
异步非阻塞 I/O 模型是一种更为高效的方式,当调用结束之后,请求立即返回,由操作系统后台完成对应的操作,当最终操作完成,就会产生一个信号,或者执行一个回调函数来完成I/O处理。
这就涉及到了Linux下的aio机制,我们在第30讲对Linux下的aio机制进行简单的讨论。
总结
支持单机1万并发的问题被称为C10K问题,为了解决C10K问题,需要重点考虑两个方面的问题:
- 如何和操作系统配合,感知I/O事件的发生?
- 如何分配和使用进程、线程资源来服务上万个连接?
基于这些组合,产生了一些通用的解决方法,在Linux下,解决高性能问题的利器是非阻塞I/O加上epoll机制,再利用多线程能力。
思考题
最后给你布置两道思考题:
第一道,查询一下资料,看看著名的Netty网络编程库,用的是哪一种C10K解决方法呢?
涉及netty的三种模型,常用的是主从reacter模型,分别由eventloopgroup线程池来处理连接和IO事件,底层就是epoll。
第二道,现在大家又把眼光放到了更有挑战性的C10M问题,即单机处理千万级并发,挑战和瓶颈又在哪里呢?
要实现 C10M ,就不只是增加物理资源,或者优化内核和应用程序可以解决的问题了。这时候,就需要用 XDP 的方式,在内核协议栈之前处理网络包;或者用 DPDK 直接跳过网络协议栈,在用户空间通过轮询的方式直接处理网络包。
24 使用阻塞I/O和进程模型:最传统的方式
父进程和子进程
我们知道,进程是程序执行的最小单位,一个进程有完整的地址空间、程序计数器等,如果想创建一个新的进程,使用函数fork就可以。
pid_t fork(void)
返回:在子进程中为0,在父进程中为子进程ID,若出错则为-1
如果你是第一次使用这个函数,你会觉得难以理解的地方在于,虽然我们的程序调用fork一次,它却在父、子进程里各返回一次。在调用该函数的进程(即为父进程)中返回的是新派生的进程ID号,在子进程中返回的值为0。想要知道当前执行的进程到底是父进程,还是子进程,只能通过返回值来进行判断。
fork函数实现的时候,实际上会把当前父进程的所有相关值都克隆一份,包括地址空间、打开的文件描述符、程序计数器等,就连执行代码也会拷贝一份,新派生的进程的表现行为和父进程近乎一样,就好像是派生进程调用过fork函数一样。为了区别两个不同的进程,实现者可以通过改变fork函数的栈空间值来判断,对应到程序中就是返回值的不同。
这样就形成了编程范式:
if(fork() == 0){
do_child_process(); //子进程执行代码
}else{
do_parent_process(); //父进程执行代码
}
当一个子进程退出时,系统内核还保留了该进程的若干信息,比如退出状态。这样的进程如果不回收,就会变成僵尸进程。在Linux下,这样的“僵尸”进程会被挂到进程号为1的init进程上。所以,由父进程派生出来的子进程,也必须由父进程负责回收,否则子进程就会变成僵尸进程。僵尸进程会占用不必要的内存空间,如果量多到了一定数量级,就会耗尽我们的系统资源。
有两种方式可以在子进程退出后回收资源,分别是调用wait和waitpid函数。
pid_t wait(int *statloc);
pid_t waitpid(pid_t pid, int *statloc, int options);
函数wait和waitpid都可以返回两个值,一个是函数返回值,表示已终止子进程的进程ID号,另一个则是通过statloc指针返回子进程终止的实际状态。这个状态可能的值为正常终止、被信号杀死、作业控制停止等。
如果没有已终止的子进程,而是有一个或多个子进程在正常运行,那么wait将阻塞,直到第一个子进程终止。
waitpid可以认为是wait函数的升级版,它的参数更多,提供的控制权也更多。pid参数允许我们指定任意想等待终止的进程ID,值-1表示等待第一个终止的子进程。options参数给了我们更多的控制选项。
处理子进程退出的方式一般是注册一个信号处理函数,捕捉信号SIGCHLD信号,然后再在信号处理函数里调用waitpid函数来完成子进程资源的回收。SIGCHLD是子进程退出或者中断时由内核向父进程发出的信号,默认这个信号是忽略的。所以,如果想在子进程退出时能回收它,需要像下面一样,注册一个SIGCHLD函数。
signal(SIGCHLD, sigchld_handler);
阻塞I/O的进程模型
为了说明使用阻塞I/O和进程模型,我们假设有两个客户端,服务器初始监听在套接字lisnted_fd上。当第一个客户端发起连接请求,连接建立后产生出连接套接字,此时,父进程派生出一个子进程,在子进程中,使用连接套接字和客户端通信,因此子进程不需要关心监听套接字,只需要关心连接套接字;父进程则相反,将客户服务交给子进程来处理,因此父进程不需要关心连接套接字,只需要关心监听套接字。
这张图描述了从连接请求到连接建立,父进程派生子进程为客户服务。
假设父进程之后又接收了新的连接请求,从accept调用返回新的已连接套接字,父进程又派生出另一个子进程,这个子进程用第二个已连接套接字为客户端服务。
这张图同样描述了这个过程。
现在,服务器端的父进程继续监听在套接字上,等待新的客户连接到来;两个子进程分别使用两个不同的连接套接字为两个客户服务。
程序讲解
我们将前面的内容串联起来,就是下面完整的一个基于进程模型的服务器端程序。
#include "lib/common.h"
#define MAX_LINE 4096
char rot13_char(char c) {
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}
void child_run(int fd) {
char outbuf[MAX_LINE + 1];
size_t outbuf_used = 0;
ssize_t result;
while (1) {
char ch;
result = recv(fd, &ch, 1, 0);
if (result == 0) {
break;
} else if (result == -1) {
perror("read");
break;
}
if (outbuf_used < sizeof(outbuf)) {
outbuf[outbuf_used++] = rot13_char(ch);
}
if (ch == '\n') {
send(fd, outbuf, outbuf_used, 0);
outbuf_used = 0;
continue;
}
}
}
void sigchld_handler(int sig) {
while (waitpid(-1, 0, WNOHANG) > 0);
return;
}
int main(int c, char **v) {
int listener_fd = tcp_server_listen(SERV_PORT);
signal(SIGCHLD, sigchld_handler);
while (1) {
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);
if (fd < 0) {
error(1, errno, "accept failed");
exit(1);
}
if (fork() == 0) {
close(listener_fd);
child_run(fd);
exit(0);
} else {
close(fd);
}
}
return 0;
}
程序的48行注册了一个信号处理函数,用来回收子进程资源。函数sigchld_handler,在一个循环体内调用了waitpid函数,以便回收所有已终止的子进程。这里选项WNOHANG用来告诉内核,即使还有未终止的子进程也不要阻塞在waitpid上。注意这里不可以使用wait,因为wait函数在有未终止子进程的情况下,没有办法不阻塞。
程序的58-62行,通过判断fork的返回值为0,进入子进程处理逻辑。按照前面的讲述,子进程不需要关心监听套接字,故而在这里关闭掉监听套接字listen_fd,之后调用child_run函数使用已连接套接字fd来进行数据读写。第63行,进入的是父进程处理逻辑,父进程不需要关心连接套接字,所以在这里关闭连接套接字。
还记得 第11讲 中讲到的close函数吗?我们知道,从父进程派生出的子进程,同时也会复制一份描述字,也就是说,连接套接字和监听套接字的引用计数都会被加1,而调用close函数则会对引用计数进行减1操作,这样在套接字引用计数到0时,才可以将套接字资源回收。所以,这里的close函数非常重要,缺少了它们,就会引起服务器端资源的泄露。
child_run函数中,通过一个while循环来不断和客户端进行交互,依次读出字符之后,进行了简单的转码,如果读到回车符,则将转码之后的结果通过连接套接字发送出去。这样的回显方式,显得比较有“交互感”。
实验
我们启动该服务器,监听在对应的端口43211上。
./fork01
再启动两个telnet客户端,连接到43211端口,每次通过标准输入和服务器端传输一些数据,我们看到,服务器和客户端的交互正常。
$telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
afasfa
nsnfsn
]
telnet> quit
Connection closed.
$telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
agasgasg
ntnftnft
]
telnet> quit
Connection closed.
客户端退出,服务器端也在正常工作,此时如果再通过telnet建立新的连接,客户端和服务器端的数据传输也会正常进行。
至此,我们构建了一个完整的服务器端程序,可以并发处理多个不同的客户连接,互不干扰。
总结
使用阻塞I/O和进程模型,为每一个连接创建一个独立的子进程来进行服务,是一个非常简单有效的实现方式,这种方式可能很难满足高性能程序的需求,但好处在于实现简单。在实现这样的程序时,我们需要注意两点:
- 要注意对套接字的关闭梳理;
- 要注意对子进程进行回收,避免产生不必要的僵尸进程。
25 使用阻塞I/O和线程模型:换一种轻量的方式
使用进程模型来处理用户连接请求,进程切换上下文的代价是比较高的,幸运的是,有一种轻量级的模型可以处理多用户连接请求,这就是线程模型。
线程(thread)是运行在进程中的一个“逻辑流”,现代操作系统都允许在单进程中运行多个线程。线程由操作系统内核管理。每个线程都有自己的上下文(context),包括一个可以唯一标识线程的ID(thread ID,或者叫tid)、栈、程序计数器、寄存器等。在同一个进程中,所有的线程共享该进程的整个虚拟地址空间,包括代码、数据、堆、共享库等。
在前面的程序中,我们没有显式使用线程,但这不代表线程没有发挥作用。实际上,每个进程一开始都会产生一个线程,一般被称为主线程,主线程可以再产生子线程,这样的主线程-子线程对可以叫做一个对等线程。
你可能会问,既然可以使用多进程来处理并发,为什么还要使用多线程模型呢?
简单来说,在同一个进程下,线程上下文切换的开销要比进程小得多。怎么理解线程上下文呢?我们的代码被CPU执行的时候,是需要一些数据支撑的,比如程序计数器告诉CPU代码执行到哪里了,寄存器里存了当前计算的一些中间值,内存里放置了一些当前用到的变量等,从一个计算场景,切换到另外一个计算场景,程序计数器、寄存器等这些值重新载入新场景的值,就是线程的上下文切换。
POSIX线程模型
POSIX线程是现代UNIX系统提供的处理线程的标准接口。POSIX定义的线程函数大约有60多个,这些函数可以帮助我们创建线程、回收线程。接下来我们先看一个简单的例子程序。
int another_shared = 0;
void thread_run(void *arg) {
int *calculator = (int *) arg;
printf("hello, world, tid == %d \n", pthread_self());
for (int i = 0; i < 1000; i++) {
*calculator += 1;
another_shared += 1;
}
}
int main(int c, char **v) {
int calculator;
pthread_t tid1;
pthread_t tid2;
pthread_create(&tid1, NULL, thread_run, &calculator);
pthread_create(&tid2, NULL, thread_run, &calculator);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
printf("calculator is %d \n", calculator);
printf("another_shared is %d \n", another_shared);
}
thread_helloworld程序中,主线程依次创建了两个子线程,然后等待这两个子线程处理完毕之后终止。每个子线程都在对两个共享变量进行计算,最后在主线程中打印出最后的计算结果。
程序的第18和19行分别调用了pthread_create创建了两个线程,每个线程的入口都是thread_run函数,这里我们使用了calculator这个全局变量,并且通过传地址指针的方式,将这个值传给了thread_run函数。当调用pthread_create结束,子线程会立即执行,主线程在此后调用了pthread_join函数等待子线程结束。
运行这个程序,很幸运,计算的结果是正确的。
$./thread-helloworld
hello, world, tid == 125607936
hello, world, tid == 126144512
calculator is 2000
another_shared is 2000
主要线程函数
创建线程
正如前面看到,通过调用pthread_create函数来创建一个线程。这个函数的原型如下:
int pthread_create(pthread_t *tid, const pthread_attr_t *attr,
void *(*func)(void *), void *arg);
返回:若成功则为0,若出错则为正的Exxx值
每个线程都有一个线程ID(tid)唯一来标识,其数据类型为pthread_t,一般是unsigned int。pthread_create函数的第一个输出参数tid就是代表了线程ID,如果创建线程成功,tid就返回正确的线程ID。
每个线程都会有很多属性,比如优先级、是否应该成为一个守护进程等,这些值可以通过pthread_attr_t来描述,一般我们不会特殊设置,可以直接指定这个参数为NULL。
第三个参数为新线程的入口函数,该函数可以接收一个参数arg,类型为指针,如果我们想给线程入口函数传多个值,那么需要把这些值包装成一个结构体,再把这个结构体的地址作为pthread_create的第四个参数,在线程入口函数内,再将该地址转为该结构体的指针对象。
在新线程的入口函数内,可以执行pthread_self函数返回线程tid。
pthread_t pthread_self(void)
终止线程
终止一个线程最直接的方法是在父线程内调用以下函数:
void pthread_exit(void *status)
当调用这个函数之后,父线程会等待其他所有的子线程终止,之后父线程自己终止。
当然,如果一个子线程入口函数直接退出了,那么子线程也就自然终止了。所以,绝大多数的子线程执行体都是一个无限循环。
也可以通过调用pthread_cancel来主动终止一个子线程,和pthread_exit不同的是,它可以指定某个子线程终止。
int pthread_cancel(pthread_t tid)
回收已终止线程的资源
我们可以通过调用pthread_join回收已终止线程的资源:
int pthread_join(pthread_t tid, void ** thread_return)
当调用pthread_join时,主线程会阻塞,直到对应tid的子线程自然终止。和pthread_cancel不同的是,它不会强迫子线程终止。
分离线程
一个线程的重要属性是可结合的,或者是分离的。一个可结合的线程是能够被其他线程杀死和回收资源的;而一个分离的线程不能被其他线程杀死或回收资源。一般来说,默认的属性是可结合的。
我们可以通过调用pthread_detach函数可以分离一个线程:
int pthread_detach(pthread_t tid)
在高并发的例子里,每个连接都由一个线程单独处理,在这种情况下,服务器程序并不需要对每个子线程进行终止,这样的话,每个子线程可以在入口函数开始的地方,把自己设置为分离的,这样就能在它终止后自动回收相关的线程资源了,就不需要调用pthread_join函数了。
每个连接一个线程处理
接下来,我们改造一下服务器端程序。我们的目标是这样:每次有新的连接到达后,创建一个新线程,而不是用新进程来处理它。
#include "lib/common.h"
extern void loop_echo(int);
void thread_run(void *arg) {
pthread_detach(pthread_self());
int fd = (int) arg;
loop_echo(fd);
}
int main(int c, char **v) {
int listener_fd = tcp_server_listen(SERV_PORT);
pthread_t tid;
while (1) {
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);
if (fd < 0) {
error(1, errno, "accept failed");
} else {
pthread_create(&tid, NULL, &thread_run, (void *) fd);
}
}
return 0;
}
这个程序的第18行阻塞调用在accept上,一旦有新连接建立,阻塞调用返回,调用pthread_create创建一个子线程来处理这个连接。
描述连接最主要的是连接描述字,这里通过强制把描述字转换为void *指针的方式,完成了传值。如果你对这部分有点不理解,建议看一下C语言相关的指针部分内容。我们这里可以简单总结一下,虽然传的是一个指针,但是这个指针里存放的并不是一个地址,而是连接描述符的数值。
新线程入口函数thread_run里,第6行使用了pthread_detach方法,将子线程转变为分离的,也就意味着子线程独自负责线程资源回收。第7行,强制将指针转变为描述符数据,和前面将描述字转换为void *指针对应,第8行调用loop_echo方法处理这个连接的数据读写。
loop_echo的程序如下,在接收客户端的数据之后,再编码回送出去。
char rot13_char(char c) {
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}
void loop_echo(int fd) {
char outbuf[MAX_LINE + 1];
size_t outbuf_used = 0;
ssize_t result;
while (1) {
char ch;
result = recv(fd, &ch, 1, 0);
//断开连接或者出错
if (result == 0) {
break;
} else if (result == -1) {
error(1, errno, "read error");
break;
}
if (outbuf_used < sizeof(outbuf)) {
outbuf[outbuf_used++] = rot13_char(ch);
}
if (ch == '\n') {
send(fd, outbuf, outbuf_used, 0);
outbuf_used = 0;
continue;
}
}
}
运行这个程序之后,开启多个telnet客户端,可以看到这个服务器程序可以处理多个并发连接并回送数据。单独一个连接退出也不会影响其他连接的数据收发。
$telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
aaa
nnn
^]
telnet> quit
Connection closed.
构建线程池处理多个连接
上面的服务器端程序虽然可以正常工作,不过它有一个缺点,那就是如果并发连接过多,就会引起线程的频繁创建和销毁。虽然线程切换的上下文开销不大,但是线程创建和销毁的开销却是不小的。
能不能对这个程序进行一些优化呢?
我们可以使用预创建线程池的方式来进行优化。在服务器端启动时,可以先按照固定大小预创建出多个线程,当有新连接建立时,往连接字队列里放置这个新连接描述字,线程池里的线程负责从连接字队列里取出连接描述字进行处理。
这个程序的关键是连接字队列的设计,因为这里既有往这个队列里放置描述符的操作,也有从这个队列里取出描述符的操作。
对此,需要引入两个重要的概念,一个是锁mutex,一个是条件变量condition。锁很好理解,加锁的意思就是其他线程不能进入;条件变量则是在多个线程需要交互的情况下,用来线程间同步的原语。
//定义一个队列
typedef struct {
int number; //队列里的描述字最大个数
int *fd; //这是一个数组指针
int front; //当前队列的头位置
int rear; //当前队列的尾位置
pthread_mutex_t mutex; //锁
pthread_cond_t cond; //条件变量
} block_queue;
//初始化队列
void block_queue_init(block_queue *blockQueue, int number) {
blockQueue->number = number;
blockQueue->fd = calloc(number, sizeof(int));
blockQueue->front = blockQueue->rear = 0;
pthread_mutex_init(&blockQueue->mutex, NULL);
pthread_cond_init(&blockQueue->cond, NULL);
}
//往队列里放置一个描述字fd
void block_queue_push(block_queue *blockQueue, int fd) {
//一定要先加锁,因为有多个线程需要读写队列
pthread_mutex_lock(&blockQueue->mutex);
//将描述字放到队列尾的位置
blockQueue->fd[blockQueue->rear] = fd;
//如果已经到最后,重置尾的位置
if (++blockQueue->rear == blockQueue->number) {
blockQueue->rear = 0;
}
printf("push fd %d", fd);
//通知其他等待读的线程,有新的连接字等待处理
pthread_cond_signal(&blockQueue->cond);
//解锁
pthread_mutex_unlock(&blockQueue->mutex);
}
//从队列里读出描述字进行处理
int block_queue_pop(block_queue *blockQueue) {
//加锁
pthread_mutex_lock(&blockQueue->mutex);
//判断队列里没有新的连接字可以处理,就一直条件等待,直到有新的连接字入队列
while (blockQueue->front == blockQueue->rear)
pthread_cond_wait(&blockQueue->cond, &blockQueue->mutex);
//取出队列头的连接字
int fd = blockQueue->fd[blockQueue->front];
//如果已经到最后,重置头的位置
if (++blockQueue->front == blockQueue->number) {
blockQueue->front = 0;
}
printf("pop fd %d", fd);
//解锁
pthread_mutex_unlock(&blockQueue->mutex);
//返回连接字
return fd;
}
这里有block_queue相关的定义和实现,并在关键的地方加了一些注释,有几个地方需要特别注意:
第一是记得对操作进行加锁和解锁,这里是通过pthread_mutex_lock和pthread_mutex_unlock来完成的。
第二是当工作线程没有描述字可用时,需要等待,第43行通过调用pthread_cond_wait,所有的工作线程等待有新的描述字可达。第32行,主线程通知工作线程有新的描述符需要服务。
服务器端程序如下:
void thread_run(void *arg) {
pthread_t tid = pthread_self();
pthread_detach(tid);
block_queue *blockQueue = (block_queue *) arg;
while (1) {
int fd = block_queue_pop(blockQueue);
printf("get fd in thread, fd==%d, tid == %d", fd, tid);
loop_echo(fd);
}
}
int main(int c, char **v) {
int listener_fd = tcp_server_listen(SERV_PORT);
block_queue blockQueue;
block_queue_init(&blockQueue, BLOCK_QUEUE_SIZE);
thread_array = calloc(THREAD_NUMBER, sizeof(Thread));
int i;
for (i = 0; i < THREAD_NUMBER; i++) {
pthread_create(&(thread_array[i].thread_tid), NULL, &thread_run, (void *) &blockQueue);
}
while (1) {
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);
if (fd < 0) {
error(1, errno, "accept failed");
} else {
block_queue_push(&blockQueue, fd);
}
}
return 0;
}
有了描述字队列,主程序变得非常简洁。第19-23行预创建了多个线程,组成了一个线程池。28-32行在新连接建立后,将连接描述字加入到队列中。
7-9行是工作线程的主循环,从描述字队列中取出描述字,对这个连接进行服务处理。
同样的,运行这个程序之后,开启多个telnet客户端,可以看到这个服务器程序可以正常处理多个并发连接并回显。
$telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
aaa
nnn
^]
telnet> quit
Connection closed.
和前面的程序相比,线程创建和销毁的开销大大降低,但因为线程池大小固定,又因为使用了阻塞套接字,肯定会出现有连接得不到及时服务的场景。这个问题的解决还是要回到我在开篇词里提到的方案上来,多路I/O复用加上线程来处理,仅仅使用阻塞I/O模型和线程是没有办法达到极致的高并发处理能力。
总结
使用了线程来构建服务器端程序。一种是每次动态创建线程,另一种是使用线程池提高效率。和进程相比,线程的语义更轻量,使用的场景也更多。
26 使用poll单线程处理所有I-O事件
使用fork进程和pthread线程来处理多并发,这两种技术使用简单,但是性能却会随着并发数的上涨而快速下降,并不能满足极端高并发的需求。这个时候我们需要寻找更好的解决之道,这个解决之道基本的思想就是I/O事件分发。
关于代码,可以去 GitHub 上查看或下载完整代码。
重温事件驱动
基于事件的程序设计: GUI、Web
事件驱动的好处是占用资源少,效率高,可扩展性强,是支持高性能高并发的不二之选。
熟悉GUI编程的话,会知道,GUI设定了一系列的控件,如Button、Label、文本框等,当我们设计基于控件的程序时,一般都会给Button的点击安排一个函数,类似这样:
//按钮点击的事件处理
void onButtonClick(){
}
这个设计的思想是,一个无限循环的事件分发线程在后台运行,一旦用户在界面上产生了某种操作,例如点击了某个Button,或者点击了某个文本框,一个事件会被产生并放置到事件队列中,这个事件会有一个类似前面的onButtonClick回调函数。事件分发线程的任务,就是为每个发生的事件找到对应的事件回调函数并执行它。这样,一个基于事件驱动的GUI程序就可以完美地工作了。
还有一个类似的例子是Web编程领域。同样的,Web程序会在Web界面上放置各种界面元素,例如Label、文本框、按钮等,和GUI程序类似,给感兴趣的界面元素设计JavaScript回调函数,当用户操作时,对应的JavaScript回调函数会被执行,完成某个计算或操作。这样,一个基于事件驱动的Web程序就可以在浏览器中完美地工作了。
事件驱动模型,也被叫做反应堆模型(reactor),或者是Event loop模型。这个模型的核心有两点。
第一,它存在一个无限循环的事件分发线程,或者叫做reactor线程、Event loop线程。这个事件分发线程的背后,就是poll、epoll等I/O分发技术的使用。
第二,所有的I/O操作都可以抽象成事件,每个事件必须有回调函数来处理。acceptor上有连接建立成功、已连接套接字上发送缓冲区空出可以写、通信管道pipe上有数据可以读,这些都是一个个事件,通过事件分发,这些事件都可以一一被检测,并调用对应的回调函数加以处理。
几种I/O模型和线程模型设计
任何一个网络程序,所做的事情可以总结成下面几种:
- read:从套接字收取数据;
- decode:对收到的数据进行解析;
- compute:根据解析之后的内容,进行计算和处理;
- encode:将处理之后的结果,按照约定的格式进行编码;
- send:最后,通过套接字把结果发送出去。
这几个过程和套接字最相关的是read和send这两种。接下来,总结一下已经学过的几种支持多并发的网络编程技术,引出使用poll单线程处理所有I/O。
fork
使用fork来创建子进程,为每个到达的客户连接服务。这张图很好地解释了这个设计模式,可想而知的是,随着客户数的变多,fork的子进程也越来越多,即使客户和服务器之间的交互比较少,这样的子进程也不能被销毁,一直需要存在。使用fork的方式处理非常简单,它的缺点是处理效率不高,fork子进程的开销太大。
pthread
第26讲中,我们使用了pthread_create创建子线程,因为线程是比进程更轻量级的执行单位,所以它的效率相比fork的方式,有一定的提高。但是,每次创建一个线程的开销仍然是不小的,因此,引入了线程池的概念,预先创建出一个线程池,在每次新连接达到时,从线程池挑选出一个线程为之服务,很好地解决了线程创建的开销。但是,这个模式还是没有解决空闲连接占用资源的问题,如果一个连接在一定时间内没有数据交互,这个连接还是要占用一定的线程资源,直到这个连接消亡为止。
single reactor thread
前面讲到,事件驱动模式是解决高性能、高并发比较好的一种模式。为什么呢?
因为这种模式是符合大规模生产的需求的。我们的生活中遍地都是类似的模式。比如你去咖啡店喝咖啡,你点了一杯咖啡在一旁喝着,服务员也不会管你,等你有续杯需求的时候,再去和服务员提(触发事件),服务员满足了你的需求,你就继续可以喝着咖啡玩手机。整个柜台的服务方式就是一个事件驱动的方式。
这里有一张图,解释了这一讲的设计模式。一个reactor线程上同时负责分发acceptor的事件、已连接套接字的I/O事件。
single reactor thread + worker threads
但是上述的设计模式有一个问题,和I/O事件处理相比,应用程序的业务逻辑处理是比较耗时的,比如XML文件的解析、数据库记录的查找、文件资料的读取和传输、计算型工作的处理等,这些工作相对而言比较独立,它们会拖慢整个反应堆模式的执行效率。
所以,将这些decode、compute、enode型工作放置到另外的线程池中,和反应堆线程解耦,是一个比较明智的选择。反应堆线程只负责处理I/O相关的工作,业务逻辑相关的工作都被裁剪成一个一个的小任务,放到线程池里由空闲的线程来执行。当结果完成后,再交给反应堆线程,由反应堆线程通过套接字将结果发送出去。
样例程序
使用这个网络编程框架的样例程序如下:
#include <lib/acceptor.h>
#include "lib/common.h"
#include "lib/event_loop.h"
#include "lib/tcp_server.h"
char rot13_char(char c) {
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}
//连接建立之后的callback
int onConnectionCompleted(struct tcp_connection *tcpConnection) {
printf("connection completed\n");
return 0;
}
//数据读到buffer之后的callback
int onMessage(struct buffer *input, struct tcp_connection *tcpConnection) {
printf("get message from tcp connection %s\n", tcpConnection->name);
printf("%s", input->data);
struct buffer *output = buffer_new();
int size = buffer_readable_size(input);
for (int i = 0; i < size; i++) {
buffer_append_char(output, rot13_char(buffer_read_char(input)));
}
tcp_connection_send_buffer(tcpConnection, output);
return 0;
}
//数据通过buffer写完之后的callback
int onWriteCompleted(struct tcp_connection *tcpConnection) {
printf("write completed\n");
return 0;
}
//连接关闭之后的callback
int onConnectionClosed(struct tcp_connection *tcpConnection) {
printf("connection closed\n");
return 0;
}
int main(int c, char **v) {
//主线程event_loop
struct event_loop *eventLoop = event_loop_init();
//初始化acceptor
struct acceptor *acceptor = acceptor_init(SERV_PORT);
//初始tcp_server,可以指定线程数目,如果线程是0,就只有一个线程,既负责acceptor,也负责I/O
struct TCPserver *tcpServer = tcp_server_init(eventLoop, acceptor, onConnectionCompleted, onMessage,
onWriteCompleted, onConnectionClosed, 0);
tcp_server_start(tcpServer);
// main thread for acceptor
event_loop_run(eventLoop);
}
这个程序的main函数部分只有几行, 因为是第一次接触到,稍微展开介绍一下。
第49行创建了一个event_loop,即reactor对象,这个event_loop和线程相关联,每个event_loop在线程里执行的是一个无限循环,以便完成事件的分发。
第52行初始化了acceptor,用来监听在某个端口上。
第55行创建了一个TCPServer,创建的时候可以指定线程数目,这里线程是0,就只有一个线程,既负责acceptor的连接处理,也负责已连接套接字的I/O处理。这里比较重要的是传入了几个回调函数,分别对应了连接建立完成、数据读取完成、数据发送完成、连接关闭完成几种操作,通过回调函数,让业务程序可以聚焦在业务层开发。
第57行开启监听。
第60行运行event_loop无限循环,等待acceptor上有连接建立、新连接上有数据可读等。
样例程序结果
运行这个服务器程序,开启两个telnet客户端,我们看到服务器端的输出如下:
$./poll-server-onethread
[msg] set poll as dispatcher
[msg] add channel fd == 4, main thread
[msg] poll added channel fd==4
[msg] add channel fd == 5, main thread
[msg] poll added channel fd==5
[msg] event loop run, main thread
[msg] get message channel i==1, fd==5
[msg] activate channel fd == 5, revents=2, main thread
[msg] new connection established, socket == 6
connection completed
[msg] add channel fd == 6, main thread
[msg] poll added channel fd==6
[msg] get message channel i==2, fd==6
[msg] activate channel fd == 6, revents=2, main thread
get message from tcp connection connection-6
afadsfaf
[msg] get message channel i==2, fd==6
[msg] activate channel fd == 6, revents=2, main thread
get message from tcp connection connection-6
afadsfaf
fdafasf
[msg] get message channel i==1, fd==5
[msg] activate channel fd == 5, revents=2, main thread
[msg] new connection established, socket == 7
connection completed
[msg] add channel fd == 7, main thread
[msg] poll added channel fd==7
[msg] get message channel i==3, fd==7
[msg] activate channel fd == 7, revents=2, main thread
get message from tcp connection connection-7
sfasggwqe
[msg] get message channel i==3, fd==7
[msg] activate channel fd == 7, revents=2, main thread
[msg] poll delete channel fd==7
connection closed
[msg] get message channel i==2, fd==6
[msg] activate channel fd == 6, revents=2, main thread
[msg] poll delete channel fd==6
connection closed
这里自始至终都只有一个main thread在工作,可见,单线程的reactor处理多个连接时也可以表现良好。
总结
这一讲我们总结了几种不同的I/O模型和线程模型设计,并比较了各自不同的优缺点。这一讲使用了poll来处理所有的I/O事件,我们将会看到如何把acceptor的连接事件和已连接套接字的I/O事件交由不同的线程处理,而这个分离,不过是在应用程序层简单的参数配置而已。
27 I/O多路复用进阶:子线程使用poll处理连接I/O事件
在前面引入了reactor反应堆模式,并且让reactor反应堆同时分发Acceptor上的连接建立事件和已建立连接的I/O事件。在发起连接请求的客户端非常多的情况下,有一个地方是有问题的,那就是单reactor线程既分发连接建立,又分发已建立连接的I/O,有点忙不过来,在实战中的表现可能就是客户端连接成功率偏低。
再者,新的硬件技术不断发展,多核多路CPU已经得到极大的应用,单reactor反应堆模式看着大把的CPU资源却不用,有点可惜。
将acceptor上的连接建立事件和已建立连接的I/O事件分离,形成所谓的主-从reactor模式。
主-从reactor模式
下面的这张图描述了主-从reactor模式是如何工作的。
主-从这个模式的核心思想是,主反应堆线程只负责分发Acceptor连接建立,已连接套接字上的I/O事件交给sub-reactor负责分发。其中sub-reactor的数量,可以根据CPU的核数来灵活设置。
比如一个四核CPU,我们可以设置sub-reactor为4。相当于有4个身手不凡的反应堆线程同时在工作,这大大增强了I/O分发处理的效率。而且,同一个套接字事件分发只会出现在一个反应堆线程中,这会大大减少并发处理的锁开销。
我们的主反应堆线程一直在感知连接建立的事件,如果有连接成功建立,主反应堆线程通过accept方法获取已连接套接字,接下来会按照一定的算法选取一个从反应堆线程,并把已连接套接字加入到选择好的从反应堆线程中。
主反应堆线程唯一的工作,就是调用accept获取已连接套接字,以及将已连接套接字加入到从反应堆线程中。不过,这里还有一个小问题,主反应堆线程和从反应堆线程,是两个不同的线程,如何把已连接套接字加入到另外一个线程中呢?更令人沮丧的是,此时从反应堆线程或许处于事件分发的无限循环之中,在这种情况下应该怎么办呢?
这是高性能网络程序框架要解决的问题。
主-从reactor+worker threads模式
如果说主-从reactor模式解决了I/O分发的高效率问题,那么work threads就解决了业务逻辑和I/O分发之间的耦合问题。把这两个策略组装在一起,就是实战中普遍采用的模式。大名鼎鼎的Netty,就是把这种模式发挥到极致的一种实现。不过要注意Netty里面提到的worker线程,其实就是我们这里说的从reactor线程,并不是处理具体业务逻辑的worker线程。
下面贴的一段代码就是常见的Netty初始化代码,这里Boss Group就是acceptor主反应堆,workerGroup就是从反应堆。而处理业务逻辑的线程,通常都是通过使用Netty的程序开发者进行设计和定制,一般来说,业务逻辑线程需要从workerGroup线程中分离,以便支持更高的并发度。
public final class TelnetServer {
static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8992" : "8023"));
public static void main(String[] args) throws Exception {
//产生一个reactor线程,只负责accetpor的对应处理
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//产生一个reactor线程,负责处理已连接套接字的I/O事件分发
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
//标准的Netty初始,通过serverbootstrap完成线程池、channel以及对应的handler设置,注意这里讲bossGroup和workerGroup作为参数设置
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TelnetServerInitializer(sslCtx));
//开启两个reactor线程无限循环处理
b.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
这张图解释了主-从反应堆下加上worker线程池的处理模式。
主-从反应堆跟上面介绍的做法是一样的。和上面不一样的是,这里将decode、compute、encode等CPU密集型的工作从I/O线程中拿走,这些工作交给worker线程池来处理,而且这些工作拆分成了一个个子任务进行。encode之后完成的结果再由sub-reactor的I/O线程发送出去。
样例程序
#include <lib/acceptor.h>
#include "lib/common.h"
#include "lib/event_loop.h"
#include "lib/tcp_server.h"
char rot13_char(char c) {
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}
//连接建立之后的callback
int onConnectionCompleted(struct tcp_connection *tcpConnection) {
printf("connection completed\n");
return 0;
}
//数据读到buffer之后的callback
int onMessage(struct buffer *input, struct tcp_connection *tcpConnection) {
printf("get message from tcp connection %s\n", tcpConnection->name);
printf("%s", input->data);
struct buffer *output = buffer_new();
int size = buffer_readable_size(input);
for (int i = 0; i < size; i++) {
buffer_append_char(output, rot13_char(buffer_read_char(input)));
}
tcp_connection_send_buffer(tcpConnection, output);
return 0;
}
//数据通过buffer写完之后的callback
int onWriteCompleted(struct tcp_connection *tcpConnection) {
printf("write completed\n");
return 0;
}
//连接关闭之后的callback
int onConnectionClosed(struct tcp_connection *tcpConnection) {
printf("connection closed\n");
return 0;
}
int main(int c, char **v) {
//主线程event_loop
struct event_loop *eventLoop = event_loop_init();
//初始化acceptor
struct acceptor *acceptor = acceptor_init(SERV_PORT);
//初始tcp_server,可以指定线程数目,这里线程是4,说明是一个acceptor线程,4个I/O线程,没一个I/O线程
//tcp_server自己带一个event_loop
struct TCPserver *tcpServer = tcp_server_init(eventLoop, acceptor, onConnectionCompleted, onMessage,
onWriteCompleted, onConnectionClosed, 4);
tcp_server_start(tcpServer);
// main thread for acceptor
event_loop_run(eventLoop);
}
我们的样例程序几乎和第26讲的一样,唯一的不同是在创建TCPServer时,线程的数量设置不再是0,而是4。这里线程是4,说明是一个主acceptor线程,4个从reactor线程,每一个线程都跟一个event_loop一一绑定。
你可能会问,这么简单就完成了主、从线程的配置?
答案是YES。这其实是设计框架需要考虑的地方,一个框架不仅要考虑性能、扩展性,也需要考虑可用性。可用性部分就是程序开发者如何使用框架。如果我是一个开发者,我肯定关心框架的使用方式是不是足够方便,配置是不是足够灵活等。
像这里,可以根据需求灵活地配置主、从反应堆线程,就是一个易用性的体现。当然,因为时间有限,我没有考虑woker线程的部分,这部分其实应该是应用程序自己来设计考虑。网络编程框架通过回调函数暴露了交互的接口,这里应用程序开发者完全可以在onMessage方法里面获取一个子线程来处理encode、compute和encode的工作,像下面的示范代码一样。
//数据读到buffer之后的callback
int onMessage(struct buffer *input, struct tcp_connection *tcpConnection) {
printf("get message from tcp connection %s\n", tcpConnection->name);
printf("%s", input->data);
//取出一个线程来负责decode、compute和encode
struct buffer *output = thread_handle(input);
//处理完之后再通过reactor I/O线程发送数据
tcp_connection_send_buffer(tcpConnection, output);
return
样例程序结果
我们启动这个服务器端程序,你可以从服务器端的输出上看到使用了poll作为事件分发方式。
多打开几个telnet客户端交互,main-thread只负责新的连接建立,每个客户端数据的收发由不同的子线程Thread-1、Thread-2、Thread-3和Thread-4来提供服务。
这里由于使用了子线程进行I/O处理,主线程可以专注于新连接处理,从而大大提高了客户端连接成功率。
$./poll-server-multithreads
[msg] set poll as dispatcher
[msg] add channel fd == 4, main thread
[msg] poll added channel fd==4
[msg] set poll as dispatcher
[msg] add channel fd == 7, main thread
[msg] poll added channel fd==7
[msg] event loop thread init and signal, Thread-1
[msg] event loop run, Thread-1
[msg] event loop thread started, Thread-1
[msg] set poll as dispatcher
[msg] add channel fd == 9, main thread
[msg] poll added channel fd==9
[msg] event loop thread init and signal, Thread-2
[msg] event loop run, Thread-2
[msg] event loop thread started, Thread-2
[msg] set poll as dispatcher
[msg] add channel fd == 11, main thread
[msg] poll added channel fd==11
[msg] event loop thread init and signal, Thread-3
[msg] event loop thread started, Thread-3
[msg] set poll as dispatcher
[msg] event loop run, Thread-3
[msg] add channel fd == 13, main thread
[msg] poll added channel fd==13
[msg] event loop thread init and signal, Thread-4
[msg] event loop run, Thread-4
[msg] event loop thread started, Thread-4
[msg] add channel fd == 5, main thread
[msg] poll added channel fd==5
[msg] event loop run, main thread
[msg] get message channel i==1, fd==5
[msg] activate channel fd == 5, revents=2, main thread
[msg] new connection established, socket == 14
connection completed
[msg] get message channel i==0, fd==7
[msg] activate channel fd == 7, revents=2, Thread-1
[msg] wakeup, Thread-1
[msg] add channel fd == 14, Thread-1
[msg] poll added channel fd==14
[msg] get message channel i==1, fd==14
[msg] activate channel fd == 14, revents=2, Thread-1
get message from tcp connection connection-14
fasfas
[msg] get message channel i==1, fd==14
[msg] activate channel fd == 14, revents=2, Thread-1
get message from tcp connection connection-14
fasfas
asfa
[msg] get message channel i==1, fd==5
[msg] activate channel fd == 5, revents=2, main thread
[msg] new connection established, socket == 15
connection completed
[msg] get message channel i==0, fd==9
[msg] activate channel fd == 9, revents=2, Thread-2
[msg] wakeup, Thread-2
[msg] add channel fd == 15, Thread-2
[msg] poll added channel fd==15
[msg] get message channel i==1, fd==15
[msg] activate channel fd == 15, revents=2, Thread-2
get message from tcp connection connection-15
afasdfasf
[msg] get message channel i==1, fd==15
[msg] activate channel fd == 15, revents=2, Thread-2
get message from tcp connection connection-15
afasdfasf
safsafa
[msg] get message channel i==1, fd==15
[msg] activate channel fd == 15, revents=2, Thread-2
[msg] poll delete channel fd==15
connection closed
[msg] get message channel i==1, fd==5
[msg] activate channel fd == 5, revents=2, main thread
[msg] new connection established, socket == 16
connection completed
[msg] get message channel i==0, fd==11
[msg] activate channel fd == 11, revents=2, Thread-3
[msg] wakeup, Thread-3
[msg] add channel fd == 16, Thread-3
[msg] poll added channel fd==16
[msg] get message channel i==1, fd==16
[msg] activate channel fd == 16, revents=2, Thread-3
get message from tcp connection connection-16
fdasfasdf
[msg] get message channel i==1, fd==14
[msg] activate channel fd == 14, revents=2, Thread-1
[msg] poll delete channel fd==14
connection closed
[msg] get message channel i==1, fd==5
[msg] activate channel fd == 5, revents=2, main thread
[msg] new connection established, socket == 17
connection completed
[msg] get message channel i==0, fd==13
[msg] activate channel fd == 13, revents=2, Thread-4
[msg] wakeup, Thread-4
[msg] add channel fd == 17, Thread-4
[msg] poll added channel fd==17
[msg] get message channel i==1, fd==17
[msg] activate channel fd == 17, revents=2, Thread-4
get message from tcp connection connection-17
qreqwrq
[msg] get message channel i==1, fd==16
[msg] activate channel fd == 16, revents=2, Thread-3
[msg] poll delete channel fd==16
connection closed
[msg] get message channel i==1, fd==5
[msg] activate channel fd == 5, revents=2, main thread
[msg] new connection established, socket == 18
connection completed
[msg] get message channel i==0, fd==7
[msg] activate channel fd == 7, revents=2, Thread-1
[msg] wakeup, Thread-1
[msg] add channel fd == 18, Thread-1
[msg] poll added channel fd==18
[msg] get message channel i==1, fd==18
[msg] activate channel fd == 18, revents=2, Thread-1
get message from tcp connection connection-18
fasgasdg
^C
总结
本讲主要讲述了主从reactor模式,主从reactor模式中,主reactor只负责连接建立的处理,而把已连接套接字的I/O事件分发交给从reactor线程处理,这大大提高了客户端连接的处理能力。从Netty的实现上来看,也遵循了这一原则。
28 使用epoll和多线程模型
基于poll事件分发的reactor反应堆模式,以及主从反应堆模式。和poll相比,Linux提供的epoll是一种更为高效的事件分发机制。将切换到epoll实现的主从反应堆模式,并且分析一下为什么epoll的性能会强于poll等传统的事件分发机制。
如何切换到epoll
已经将所有的代码已经放置到 GitHub 上,你可以自行查看或下载。
我们的网络编程框架是可以同时支持poll和epoll机制的,那么如何开启epoll的支持呢?
lib/event_loop.c文件的event_loop_init_with_name函数是关键,可以看到,这里是通过宏EPOLL_ENABLE来决定是使用epoll还是poll的。
struct event_loop *event_loop_init_with_name(char *thread_name) {
...
#ifdef EPOLL_ENABLE
yolanda_msgx("set epoll as dispatcher, %s", eventLoop->thread_name);
eventLoop->eventDispatcher = &epoll_dispatcher;
#else
yolanda_msgx("set poll as dispatcher, %s", eventLoop->thread_name);
eventLoop->eventDispatcher = &poll_dispatcher;
#endif
eventLoop->event_dispatcher_data = eventLoop->eventDispatcher->init(eventLoop);
...
}
在根目录下的CMakeLists.txt文件里,引入CheckSymbolExists,如果系统里有epoll_create函数和sys/epoll.h,就自动开启EPOLL_ENABLE。如果没有,EPOLL_ENABLE就不会开启,自动使用poll作为默认的事件分发机制。
# check epoll and add config.h for the macro compilation
include(CheckSymbolExists)
check_symbol_exists(epoll_create "sys/epoll.h" EPOLL_EXISTS)
if (EPOLL_EXISTS)
# Linux下设置为epoll
set(EPOLL_ENABLE 1 CACHE INTERNAL "enable epoll")
# Linux下也设置为poll
# set(EPOLL_ENABLE "" CACHE INTERNAL "not enable epoll")
else ()
set(EPOLL_ENABLE "" CACHE INTERNAL "not enable epoll")
endif ()
但是,为了能让编译器使用到这个宏,需要让CMake往config.h文件里写入这个宏的最终值,configure_file命令就是起这个作用的。其中config.h.cmake是一个模板文件,已经预先创建在根目录下。同时还需要让编译器include这个config.h文件。include_directories可以帮我们达成这个目标。
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake
${CMAKE_CURRENT_BINARY_DIR}/include/config.h)
include_directories(${CMAKE_CURRENT_BINARY_DIR}/include)
这样,在Linux下,就会默认使用epoll作为事件分发。
那么前面的 26讲 和 27讲 中的程序案例如何改为使用poll的呢?
我们可以修改CMakeLists.txt文件,把Linux下设置为poll的那段注释下的命令打开,同时关闭掉原先设置为1的命令就可以了。 下面就是具体的示例代码。
# check epoll and add config.h for the macro compilation
include(CheckSymbolExists)
check_symbol_exists(epoll_create "sys/epoll.h" EPOLL_EXISTS)
if (EPOLL_EXISTS)
# Linux下也设置为poll
set(EPOLL_ENABLE "" CACHE INTERNAL "not enable epoll")
else ()
set(EPOLL_ENABLE "" CACHE INTERNAL "not enable epoll")
endif (
不管怎样,现在我们得到了一个Linux下使用epoll作为事件分发的版本。
样例程序
我们的样例程序和 第27讲 的一模一样,只是现在我们的事件分发机制从poll切换到了epoll。
#include <lib/acceptor.h>
#include "lib/common.h"
#include "lib/event_loop.h"
#include "lib/tcp_server.h"
char rot13_char(char c) {
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}
//连接建立之后的callback
int onConnectionCompleted(struct tcp_connection *tcpConnection) {
printf("connection completed\n");
return 0;
}
//数据读到buffer之后的callback
int onMessage(struct buffer *input, struct tcp_connection *tcpConnection) {
printf("get message from tcp connection %s\n", tcpConnection->name);
printf("%s", input->data);
struct buffer *output = buffer_new();
int size = buffer_readable_size(input);
for (int i = 0; i < size; i++) {
buffer_append_char(output, rot13_char(buffer_read_char(input)));
}
tcp_connection_send_buffer(tcpConnection, output);
return 0;
}
//数据通过buffer写完之后的callback
int onWriteCompleted(struct tcp_connection *tcpConnection) {
printf("write completed\n");
return 0;
}
//连接关闭之后的callback
int onConnectionClosed(struct tcp_connection *tcpConnection) {
printf("connection closed\n");
return 0;
}
int main(int c, char **v) {
//主线程event_loop
struct event_loop *eventLoop = event_loop_init();
//初始化acceptor
struct acceptor *acceptor = acceptor_init(SERV_PORT);
//初始tcp_server,可以指定线程数目,这里线程是4,说明是一个acceptor线程,4个I/O线程,没一个I/O线程
//tcp_server自己带一个event_loop
struct TCPserver *tcpServer = tcp_server_init(eventLoop, acceptor, onConnectionCompleted, onMessage,
onWriteCompleted, onConnectionClosed, 4);
tcp_server_start(tcpServer);
// main thread for acceptor
event_loop_run(eventLoop);
}
关于这个程序,之前一直没有讲到的部分是缓冲区对象buffer。这其实也是网络编程框架应该考虑的部分。
我们希望框架可以对应用程序封装掉套接字读和写的部分,转而提供的是针对缓冲区对象的读和写操作。这样一来,从套接字收取数据、处理异常、发送数据等操作都被类似buffer这样的对象所封装和屏蔽,应用程序所要做的事情就会变得更加简单,从buffer对象中可以获取已接收到的字节流再进行应用层处理,比如这里通过调用buffer_read_char函数从buffer中读取一个字节。
另外一方面,框架也必须对应用程序提供套接字发送的接口,接口的数据类型类似这里的buffer对象,可以看到,这里先生成了一个buffer对象,之后将编码后的结果填充到buffer对象里,最后调用tcp_connection_send_buffer将buffer对象里的数据通过套接字发送出去。
这里像onMessage、onConnectionClosed几个回调函数都是运行在子反应堆线程中的,也就是说,刚刚提到的生成buffer对象,encode部分的代码,是在子反应堆线程中执行的。这其实也是回调函数的内涵,回调函数本身只是提供了类似Handlder的处理逻辑,具体执行是由事件分发线程,或者说是event loop线程发起的。
框架通过一层抽象,让应用程序的开发者只需要看到回调函数,回调函数中的对象,也都是如buffer和tcp_connection这样封装过的对象,这样像套接字、字节流等底层实现的细节就完全由框架来完成了。
样例程序结果
启动服务器,可以从屏幕输出上看到,使用的是epoll作为事件分发器。
$./epoll-server-multithreads
[msg] set epoll as dispatcher, main thread
[msg] add channel fd == 5, main thread
[msg] set epoll as dispatcher, Thread-1
[msg] add channel fd == 9, Thread-1
[msg] event loop thread init and signal, Thread-1
[msg] event loop run, Thread-1
[msg] event loop thread started, Thread-1
[msg] set epoll as dispatcher, Thread-2
[msg] add channel fd == 12, Thread-2
[msg] event loop thread init and signal, Thread-2
[msg] event loop run, Thread-2
[msg] event loop thread started, Thread-2
[msg] set epoll as dispatcher, Thread-3
[msg] add channel fd == 15, Thread-3
[msg] event loop thread init and signal, Thread-3
[msg] event loop run, Thread-3
[msg] event loop thread started, Thread-3
[msg] set epoll as dispatcher, Thread-4
[msg] add channel fd == 18, Thread-4
[msg] event loop thread init and signal, Thread-4
[msg] event loop run, Thread-4
[msg] event loop thread started, Thread-4
[msg] add channel fd == 6, main thread
[msg] event loop run, main thread
开启多个telnet客户端,连接上该服务器, 通过屏幕输入和服务器端交互。
$telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
fafaf
snsns
^]
telnet> quit
Connection closed.
服务端显示不断地从epoll_wait中返回处理I/O事件。
[msg] epoll_wait wakeup, main thread
[msg] get message channel fd==6 for read, main thread
[msg] activate channel fd == 6, revents=2, main thread
[msg] new connection established, socket == 19
connection completed
[msg] epoll_wait wakeup, Thread-1
[msg] get message channel fd==9 for read, Thread-1
[msg] activate channel fd == 9, revents=2, Thread-1
[msg] wakeup, Thread-1
[msg] add channel fd == 19, Thread-1
[msg] epoll_wait wakeup, Thread-1
[msg] get message channel fd==19 for read, Thread-1
[msg] activate channel fd == 19, revents=2, Thread-1
get message from tcp connection connection-19
afasf
[msg] epoll_wait wakeup, main thread
[msg] get message channel fd==6 for read, main thread
[msg] activate channel fd == 6, revents=2, main thread
[msg] new connection established, socket == 20
connection completed
[msg] epoll_wait wakeup, Thread-2
[msg] get message channel fd==12 for read, Thread-2
[msg] activate channel fd == 12, revents=2, Thread-2
[msg] wakeup, Thread-2
[msg] add channel fd == 20, Thread-2
[msg] epoll_wait wakeup, Thread-2
[msg] get message channel fd==20 for read, Thread-2
[msg] activate channel fd == 20, revents=2, Thread-2
get message from tcp connection connection-20
asfasfas
[msg] epoll_wait wakeup, Thread-2
[msg] get message channel fd==20 for read, Thread-2
[msg] activate channel fd == 20, revents=2, Thread-2
connection closed
[msg] epoll_wait wakeup, main thread
[msg] get message channel fd==6 for read, main thread
[msg] activate channel fd == 6, revents=2, main thread
[msg] new connection established, socket == 21
connection completed
[msg] epoll_wait wakeup, Thread-3
[msg] get message channel fd==15 for read, Thread-3
[msg] activate channel fd == 15, revents=2, Thread-3
[msg] wakeup, Thread-3
[msg] add channel fd == 21, Thread-3
[msg] epoll_wait wakeup, Thread-3
[msg] get message channel fd==21 for read, Thread-3
[msg] activate channel fd == 21, revents=2, Thread-3
get message from tcp connection connection-21
dfasfadsf
[msg] epoll_wait wakeup, Thread-1
[msg] get message channel fd==19 for read, Thread-1
[msg] activate channel fd == 19, revents=2, Thread-1
connection closed
[msg] epoll_wait wakeup, main thread
[msg] get message channel fd==6 for read, main thread
[msg] activate channel fd == 6, revents=2, main thread
[msg] new connection established, socket == 22
connection completed
[msg] epoll_wait wakeup, Thread-4
[msg] get message channel fd==18 for read, Thread-4
[msg] activate channel fd == 18, revents=2, Thread-4
[msg] wakeup, Thread-4
[msg] add channel fd == 22, Thread-4
[msg] epoll_wait wakeup, Thread-4
[msg] get message channel fd==22 for read, Thread-4
[msg] activate channel fd == 22, revents=2, Thread-4
get message from tcp connection connection-22
fafaf
[msg] epoll_wait wakeup, Thread-4
[msg] get message channel fd==22 for read, Thread-4
[msg] activate channel fd == 22, revents=2, Thread-4
connection closed
[msg] epoll_wait wakeup, Thread-3
[msg] get message channel fd==21 for read, Thread-3
[msg] activate channel fd == 21, revents=2, Thread-3
connection closed
其中主线程的epoll_wait只处理acceptor套接字的事件,表示的是连接的建立;反应堆子线程的epoll_wait主要处理的是已连接套接字的读写事件。这幅图详细解释了这部分逻辑。
epoll的性能分析
epoll的性能凭什么就要比poll或者select好呢?这要从两个角度来说明。
第一个角度是事件集合。在每次使用poll或select之前,都需要准备一个感兴趣的事件集合,系统内核拿到事件集合,进行分析并在内核空间构建相应的数据结构来完成对事件集合的注册。而epoll则不是这样,epoll维护了一个全局的事件集合,通过epoll句柄,可以操纵这个事件集合,增加、删除或修改这个事件集合里的某个元素。要知道在绝大多数情况下,事件集合的变化没有那么的大,这样操纵系统内核就不需要每次重新扫描事件集合,构建内核空间数据结构。
第二个角度是就绪列表。每次在使用poll或者select之后,应用程序都需要扫描整个感兴趣的事件集合,从中找出真正活动的事件,这个列表如果增长到10K以上,每次扫描的时间损耗也是惊人的。事实上,很多情况下扫描完一圈,可能发现只有几个真正活动的事件。而epoll则不是这样,epoll返回的直接就是活动的事件列表,应用程序减少了大量的扫描时间。
此外, epoll还提供了更高级的能力——边缘触发。 第23讲 通过一个直观的例子,讲解了边缘触发和条件触发的区别。
这里再举一个例子说明一下。
如果某个套接字有100个字节可以读,边缘触发(edge-triggered)和条件触发(level-triggered)都会产生read ready notification事件,如果应用程序只读取了50个字节,边缘触发就会陷入等待;而条件触发则会因为还有50个字节没有读取完,不断地产生read ready notification事件。
在条件触发下(level-triggered),如果某个套接字缓冲区可以写,会无限次返回write ready notification事件,在这种情况下,如果应用程序没有准备好,不需要发送数据,一定需要解除套接字上的ready notification事件,否则CPU就直接跪了。
我们简单地总结一下,边缘触发只会产生一次活动事件,性能和效率更高。不过,程序处理起来要更为小心。
select
、poll
和 epoll
是三种常用的多路复用技术,用于在单个线程中同时监控多个文件描述符(通常是网络套接字)的状态变化。下面分别介绍它们的优点和缺点:
select
优点:
- 简单易用:
select
是最古老的多路复用机制之一,几乎所有的操作系统都支持。 - 跨平台兼容性好: 它在多种平台上都有良好的支持,包括 Unix/Linux、Windows 等。
- 易于理解和实现: 对于简单的场景,
select
的使用非常直观。
缺点:
- 最大文件描述符限制: 在大多数系统上,
select
监控的文件描述符数量有上限,通常是 1024。 - 效率较低: 每次调用
select
都需要将文件描述符集合从用户空间复制到内核空间,再从内核空间复制回用户空间,这会导致一定的性能开销。 - 缺乏扩展性:
select
不支持边缘触发模式,这使得它在高并发场景下效率较低。
poll
优点:
- 没有文件描述符数量限制:
poll
没有像select
那样的文件描述符数量限制,理论上可以支持任意数量的文件描述符。 - 支持多种事件类型:
poll
支持多种类型的事件,如读事件、写事件等。
缺点:
- 效率问题: 尽管
poll
没有文件描述符数量的限制,但它仍然需要遍历整个文件描述符列表来查找就绪的文件描述符,这在文件描述符非常多的情况下会导致效率问题。 - 不支持边缘触发: 类似
select
,poll
也不支持边缘触发模式,这可能会影响性能。
epoll
优点:
- 高效:
epoll
是 Linux 2.6 及以上版本引入的一种改进的多路复用技术,相比select
和poll
更高效。 - 无文件描述符限制:
epoll
使用文件描述符索引来跟踪就绪的文件描述符,因此没有文件描述符数量的限制。 - 边缘触发模式:
epoll
支持边缘触发 (ET) 模式,这可以减少不必要的上下文切换,提高性能。 - 只通知就绪的文件描述符:
epoll
只返回就绪的文件描述符,而不是像select
和poll
那样返回所有文件描述符的列表。 - 灵活:
epoll
提供了添加、修改和删除文件描述符的功能,这使得它可以动态地管理文件描述符。
缺点:
- 仅限于 Linux 平台:
epoll
是 Linux 特有的功能,如果需要跨平台的话,可能需要实现额外的抽象层。 - 复杂性:
epoll
相比select
和poll
更复杂,对于初学者来说可能更难理解和使用。
总的来说,epoll
在现代 Linux 系统中是最高效的选择,尤其是在高并发场景下。而 select
和 poll
则更适合于简单的应用或跨平台的项目。选择哪种技术主要取决于具体的应用需求、目标平台和预期的并发水平。
总结
将程序框架切换到了epoll的版本,和poll版本相比,只是底层的框架做了更改,上层应用程序不用做任何修改,这也是程序框架强大的地方。和poll相比,epoll从事件集合和就绪列表两个方面加强了程序性能,是Linux下高性能网络程序的首选。
29 真正的大杀器:异步I-O
阻塞I/O、非阻塞I/O以及像select、poll、epoll等I/O多路复用技术,并在此基础上结合线程技术,实现了以事件分发为核心的reactor反应堆模式。还听说过一个叫做Proactor的网络事件驱动模式,这个Proactor模式和reactor模式到底有什么区别和联系呢?
阻塞/非阻塞 VS 同步/异步
第一种是阻塞I/O。阻塞I/O发起的read请求,线程会被挂起,一直等到内核数据准备好,并把数据从内核区域拷贝到应用程序的缓冲区中,当拷贝过程完成,read请求调用才返回。接下来,应用程序就可以对缓冲区的数据进行数据解析。
第二种是非阻塞I/O。非阻塞的read请求在数据未准备好的情况下立即返回,应用程序可以不断轮询内核,直到数据准备好,内核将数据拷贝到应用程序缓冲,并完成这次read调用。注意,这里最后一次read调用,获取数据的过程, 是一个同步的过程。这里的同步指的是内核区域的数据拷贝到缓冲区的这个过程。
每次让应用程序去轮询内核的I/O是否准备好,是一个不经济的做法,因为在轮询的过程中应用进程啥也不能干。于是,像select、poll这样的I/O多路复用技术就隆重登场了。通过I/O事件分发,当内核数据准备好时,再通知应用程序进行操作。这个做法大大改善了应用进程对CPU的利用率,在没有被通知的情况下,应用进程可以使用CPU做其他的事情。
注意,这里read调用,获取数据的过程, 也是一个同步的过程。
第一种阻塞I/O我想你已经比较了解了,在阻塞I/O的情况下,应用程序会被挂起,直到获取数据。第二种非阻塞I/O和第三种基于非阻塞I/O的多路复用技术,获取数据的操作不会被阻塞。
无论是第一种阻塞I/O,还是第二种非阻塞I/O,第三种基于非阻塞I/O的多路复用都是 同步调用技术。为什么这么说呢?因为同步调用、异步调用的说法,是对于获取数据的过程而言的,前面几种最后获取数据的read操作调用,都是同步的,在read调用时,内核将数据从内核空间拷贝到应用程序空间,这个过程是在read函数中同步进行的,如果内核实现的拷贝效率很差,read调用就会在这个同步过程中消耗比较长的时间。
而真正的异步调用则不用担心这个问题,我们接下来就来介绍第四种I/O技术,当我们发起aio_read之后,就立即返回,内核自动将数据从内核空间拷贝到应用程序空间,这个拷贝过程是异步的,内核自动完成的,和前面的同步操作不一样,应用程序并不需要主动发起拷贝动作。
还记得 第22 讲 中讲到的去书店买书的例子吗? 基于这个例子,针对以上的场景,我们可以这么理解。
第一种阻塞I/O就是你去了书店,告诉老板你想要某本书,然后你就一直在那里等着,直到书店老板翻箱倒柜找到你想要的书。
第二种非阻塞I/O类似于你去了书店,问老板有没有一本书,老板告诉你没有,你就离开了。一周以后,你又来这个书店,再问这个老板,老板一查,有了,于是你买了这本书。
第三种基于非阻塞的I/O多路复用,你来到书店告诉老板:“老板,到货给我打电话吧,我再来付钱取书。”
第四种异步I/O就是你连去书店取书的过程也想省了,你留下地址,付了书费,让老板到货时寄给你,你直接在家里拿到就可以看了。
这里放置了一张表格,总结了以上几种I/O模型。
aio_read和aio_write的用法
听起来,异步I/O有一种高大上的感觉。其实,异步I/O用起来倒是挺简单的。下面我们看一下一个具体的例子:
#include "lib/common.h"
#include <aio.h>
const int BUF_SIZE = 512;
int main() {
int err;
int result_size;
// 创建一个临时文件
char tmpname[256];
snprintf(tmpname, sizeof(tmpname), "/tmp/aio_test_%d", getpid());
unlink(tmpname);
int fd = open(tmpname, O_CREAT | O_RDWR | O_EXCL, S_IRUSR | S_IWUSR);
if (fd == -1) {
error(1, errno, "open file failed ");
}
char buf[BUF_SIZE];
struct aiocb aiocb;
//初始化buf缓冲,写入的数据应该为0xfafa这样的,
memset(buf, 0xfa, BUF_SIZE);
memset(&aiocb, 0, sizeof(struct aiocb));
aiocb.aio_fildes = fd;
aiocb.aio_buf = buf;
aiocb.aio_nbytes = BUF_SIZE;
//开始写
if (aio_write(&aiocb) == -1) {
printf(" Error at aio_write(): %s\n", strerror(errno));
close(fd);
exit(1);
}
//因为是异步的,需要判断什么时候写完
while (aio_error(&aiocb) == EINPROGRESS) {
printf("writing... \n");
}
//判断写入的是否正确
err = aio_error(&aiocb);
result_size = aio_return(&aiocb);
if (err != 0 || result_size != BUF_SIZE) {
printf(" aio_write failed() : %s\n", strerror(err));
close(fd);
exit(1);
}
//下面准备开始读数据
char buffer[BUF_SIZE];
struct aiocb cb;
cb.aio_nbytes = BUF_SIZE;
cb.aio_fildes = fd;
cb.aio_offset = 0;
cb.aio_buf = buffer;
// 开始读数据
if (aio_read(&cb) == -1) {
printf(" air_read failed() : %s\n", strerror(err));
close(fd);
}
//因为是异步的,需要判断什么时候读完
while (aio_error(&cb) == EINPROGRESS) {
printf("Reading... \n");
}
// 判断读是否成功
int numBytes = aio_return(&cb);
if (numBytes != -1) {
printf("Success.\n");
} else {
printf("Error.\n");
}
// 清理文件句柄
close(fd);
return 0;
}
这个程序展示了如何使用aio系列函数来完成异步读写。主要用到的函数有:
- aio_write:用来向内核提交异步写操作;
- aio_read:用来向内核提交异步读操作;
- aio_error:获取当前异步操作的状态;
- aio_return:获取异步操作读、写的字节数。
这个程序一开始使用aio_write方法向内核提交了一个异步写文件的操作。第23-27行是这个异步写操作的结构体。结构体aiocb是应用程序和操作系统内核传递的异步申请数据结构,这里我们使用了文件描述符、缓冲区指针aio_buf以及需要写入的字节数aio_nbytes。
struct aiocb {
int aio_fildes; /* File descriptor */
off_t aio_offset; /* File offset */
volatile void *aio_buf; /* Location of buffer */
size_t aio_nbytes; /* Length of transfer */
int aio_reqprio; /* Request priority offset */
struct sigevent aio_sigevent; /* Signal number and value */
int aio_lio_opcode; /* Operation to be performed */
};
这里我们用了一个0xfa的缓冲区,这在后面的演示中可以看到结果。
30-34行向系统内核申请了这个异步写操作,并且在37-39行查询异步动作的结果,当其结束时在42-48行判断写入的结果是否正确。
紧接着,我们使用了aio_read从文件中读取这些数据。为此,我们准备了一个新的aiocb结构体,告诉内核需要把数据拷贝到buffer这个缓冲区中,和异步写一样,发起异步读之后在第65-67行一直查询异步读动作的结果。
接下来运行这个程序,我们看到屏幕上打印出一系列的字符,显示了这个操作是有内核在后台帮我们完成的。
./aio01
writing...
writing...
writing...
writing...
writing...
writing...
writing...
writing...
writing...
writing...
writing...
writing...
writing...
writing...
Reading...
Reading...
Reading...
Reading...
Reading...
Reading...
Reading...
Reading...
Reading...
Success.
打开/tmp目录下的aio_test_xxxx文件,可以看到,这个文件成功写入了我们期望的数据。
请注意,以上的读写,都不需要我们在应用程序里再发起调用,系统内核直接帮我们做好了。
Linux下socket套接字的异步支持
aio系列函数是由POSIX定义的异步操作接口,可惜的是,Linux下的aio操作,不是真正的操作系统级别支持的,它只是由GNU libc库函数在用户空间借由pthread方式实现的,而且仅仅针对磁盘类I/O,套接字I/O不支持。
也有很多Linux的开发者尝试在操作系统内核中直接支持aio,例如一个叫做Ben LaHaise的人,就将aio实现成功merge到2.5.32中,这部分能力是作为patch存在的,但是,它依旧不支持套接字。
Solaris倒是有真正的系统系别的aio,不过还不是很确定它在套接字上的性能表现,特别是和磁盘I/O相比效果如何。
综合以上结论就是,Linux下对异步操作的支持非常有限,这也是为什么使用epoll等多路分发技术加上非阻塞I/O来解决Linux下高并发高性能网络I/O问题的根本原因。
Windows下的IOCP和Proactor模式
和Linux不同,Windows下实现了一套完整的支持套接字的异步编程接口,这套接口一般被叫做IOCompletetionPort(IOCP)。
这样,就产生了基于IOCP的所谓Proactor模式。
和Reactor模式一样,Proactor模式也存在一个无限循环运行的event loop线程,但是不同于Reactor模式,这个线程并不负责处理I/O调用,它只是负责在对应的read、write操作完成的情况下,分发完成事件到不同的处理函数。
这里举一个HTTP服务请求的例子来说明:
- 客户端发起一个GET请求;
- 这个GET请求对应的字节流被内核读取完成,内核将这个完成事件放置到一个队列中;
- event loop线程,也就是Poractor从这个队列里获取事件,根据事件类型,分发到不同的处理函数上,比如一个http handle的onMessage解析函数;
- HTTP request解析函数完成报文解析;
- 业务逻辑处理,比如读取数据库的记录;
- 业务逻辑处理完成,开始encode,完成之后,发起一个异步写操作;
- 这个异步写操作被内核执行,完成之后这个异步写操作被放置到内核的队列中;
- Proactor线程获取这个完成事件,分发到HTTP handler的onWriteCompled方法执行。
从这个例子可以看出,由于系统内核提供了真正的“异步”操作,Proactor不会再像Reactor一样,每次感知事件后再调用read、write方法完成数据的读写,它只负责感知事件完成,并由对应的handler发起异步读写请求,I/O读写操作本身是由系统内核完成的。和前面看到的aio的例子一样,这里需要传入数据缓冲区的地址等信息,这样,系统内核才可以自动帮我们把数据的读写工作完成。
无论是Reactor模式,还是Proactor模式,都是一种基于事件分发的网络编程模式。 Reactor模式是基于待完成的I/O事件,而Proactor模式则是基于已完成的I/O事件,两者的本质,都是借由事件分发的思想,设计出可兼容、可扩展、接口友好的一套程序框架。
总结
和同步I/O相比,异步I/O的读写动作由内核自动完成,不过,在Linux下目前仅仅支持简单的基于本地文件的aio异步操作,这也使得我们在编写高性能网络程序时,首选Reactor模式,借助epoll这样的I/O分发技术完成开发;而Windows下的IOCP则是一种异步I/O的技术,并由此产生了和Reactor齐名的Proactor模式,借助这种模式,可以完成Windows下高性能网络程序设计。
30 epoll源码深度剖析
基本数据结构
在开始研究源代码之前,我们先看一下epoll中使用的数据结构,分别是eventpoll、epitem和eppoll_entry。
我们先看一下eventpoll这个数据结构,这个数据结构是我们在调用epoll_create之后内核侧创建的一个句柄,表示了一个epoll实例。后续如果我们再调用epoll_ctl和epoll_wait等,都是对这个eventpoll数据进行操作,这部分数据会被保存在epoll_create创建的匿名文件file的private_data字段中。
/*
* This structure is stored inside the "private_data" member of the file
* structure and represents the main data structure for the eventpoll
* interface.
*/
struct eventpoll {
/* Protect the access to this structure */
spinlock_t lock;
/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
struct mutex mtx;
/* Wait queue used by sys_epoll_wait() */
//这个队列里存放的是执行epoll_wait从而等待的进程队列
wait_queue_head_t wq;
/* Wait queue used by file->poll() */
//这个队列里存放的是该eventloop作为poll对象的一个实例,加入到等待的队列
//这是因为eventpoll本身也是一个file, 所以也会有poll操作
wait_queue_head_t poll_wait;
/* List of ready file descriptors */
//这里存放的是事件就绪的fd列表,链表的每个元素是下面的epitem
struct list_head rdllist;
/* RB tree root used to store monitored fd structs */
//这是用来快速查找fd的红黑树
struct rb_root_cached rbr;
/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transferring ready events to userspace w/out
* holding ->lock.
*/
struct epitem *ovflist;
/* wakeup_source used when ep_scan_ready_list is running */
struct wakeup_source *ws;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
//这是eventloop对应的匿名文件,充分体现了Linux下一切皆文件的思想
struct file *file;
/* used to optimize loop detection check */
int visited;
struct list_head visited_list_link;
#ifdef CONFIG_NET_RX_BUSY_POLL
/* used to track busy poll napi_id */
unsigned int napi_id;
#endif
};
你能看到在代码中我提到了epitem,这个epitem结构是干什么用的呢?
每当我们调用epoll_ctl增加一个fd时,内核就会为我们创建出一个epitem实例,并且把这个实例作为红黑树的一个子节点,增加到eventpoll结构体中的红黑树中,对应的字段是rbr。这之后,查找每一个fd上是否有事件发生都是通过红黑树上的epitem来操作。
/*
* Each file descriptor added to the eventpoll interface will
* have an entry of this type linked to the "rbr" RB tree.
* Avoid increasing the size of this struct, there can be many thousands
* of these on a server and we do not want this to take another cache line.
*/
struct epitem {
union {
/* RB tree node links this structure to the eventpoll RB tree */
struct rb_node rbn;
/* Used to free the struct epitem */
struct rcu_head rcu;
};
/* List header used to link this structure to the eventpoll ready list */
//将这个epitem连接到eventpoll 里面的rdllist的list指针
struct list_head rdllink;
/*
* Works together "struct eventpoll"->ovflist in keeping the
* single linked chain of items.
*/
struct epitem *next;
/* The file descriptor information this item refers to */
//epoll监听的fd
struct epoll_filefd ffd;
/* Number of active wait queue attached to poll operations */
//一个文件可以被多个epoll实例所监听,这里就记录了当前文件被监听的次数
int nwait;
/* List containing poll wait queues */
struct list_head pwqlist;
/* The "container" of this item */
//当前epollitem所属的eventpoll
struct eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* wakeup_source used when EPOLLWAKEUP is set */
struct wakeup_source __rcu *ws;
/* The structure that describe the interested events and the source fd */
struct epoll_event event;
};
每次当一个fd关联到一个epoll实例,就会有一个eppoll_entry产生。eppoll_entry的结构如下:
/* Wait structure used by the poll hooks */
struct eppoll_entry {
/* List header used to link this structure to the "struct epitem" */
struct list_head llink;
/* The "base" pointer is set to the container "struct epitem" */
struct epitem *base;
/*
* Wait queue item that will be linked to the target file wait
* queue head.
*/
wait_queue_entry_t wait;
/* The wait queue head that linked the "wait" wait queue item */
wait_queue_head_t *whead;
};
epoll_create
我们在使用epoll的时候,首先会调用epoll_create来创建一个epoll实例。这个函数是如何工作的呢?
首先,epoll_create会对传入的flags参数做简单的验证。
/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
接下来,内核申请分配eventpoll需要的内存空间。
/* Create the internal data structure ("struct eventpoll").
*/
error = ep_alloc(&ep);
if (error < 0)
return error;
在接下来,epoll_create为epoll实例分配了匿名文件和文件描述字,其中fd是文件描述字,file是一个匿名文件。这里充分体现了UNIX下一切都是文件的思想。注意,eventpoll的实例会保存一份匿名文件的引用,通过调用fd_install函数将匿名文件和文件描述字完成了绑定。
这里还有一个特别需要注意的地方,在调用anon_inode_get_file的时候,epoll_create将eventpoll作为匿名文件file的private_data保存了起来,这样,在之后通过epoll实例的文件描述字来查找时,就可以快速地定位到eventpoll对象了。
最后,这个文件描述字作为epoll的文件句柄,被返回给epoll_create的调用者。
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
ep->file = file;
fd_install(fd, file);
return fd;
epoll_ctl
接下来,我们看一下一个套接字是如何被添加到epoll实例中的。这就要解析一下epoll_ctl函数实现了。
查找epoll实例
首先,epoll_ctl函数通过epoll实例句柄来获得对应的匿名文件,这一点很好理解,UNIX下一切都是文件,epoll的实例也是一个匿名文件。
//获得epoll实例对应的匿名文件
f = fdget(epfd);
if (!f.file)
goto error_return;
接下来,获得添加的套接字对应的文件,这里tf表示的是target file,即待处理的目标文件。
/* Get the "struct file *" for the target file */
//获得真正的文件,如监听套接字、读写套接字
tf = fdget(fd);
if (!tf.file)
goto error_fput;
再接下来,进行了一系列的数据验证,以保证用户传入的参数是合法的,比如epfd真的是一个epoll实例句柄,而不是一个普通文件描述符。
/* The target file descriptor must support poll */
//如果不支持poll,那么该文件描述字是无效的
error = -EPERM;
if (!tf.file->f_op->poll)
goto error_tgt_fput;
...
如果获得了一个真正的epoll实例句柄,就可以通过private_data获取之前创建的eventpoll实例了。
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = f.file->private_data;
红黑树查找
接下来epoll_ctl通过目标文件和对应描述字,在红黑树中查找是否存在该套接字,这也是epoll为什么高效的地方。红黑树(RB-tree)是一种常见的数据结构,这里eventpoll通过红黑树跟踪了当前监听的所有文件描述字,而这棵树的根就保存在eventpoll数据结构中。
/* RB tree root used to store monitored fd structs */
struct rb_root_cached rbr;
对于每个被监听的文件描述字,都有一个对应的epitem与之对应,epitem作为红黑树中的节点就保存在红黑树中。
/*
* Try to lookup the file inside our RB tree, Since we grabbed "mtx"
* above, we can be sure to be able to use the item looked up by
* ep_find() till we release the mutex.
*/
epi = ep_find(ep, tf.file, fd);
红黑树是一棵二叉树,作为二叉树上的节点,epitem必须提供比较能力,以便可以按大小顺序构建出一棵有序的二叉树。其排序能力是依靠epoll_filefd结构体来完成的,epoll_filefd可以简单理解为需要监听的文件描述字,它对应到二叉树上的节点。
可以看到这个还是比较好理解的,按照文件的地址大小排序。如果两个相同,就按照文件文件描述字来排序。
struct epoll_filefd {
struct file *file; // pointer to the target file struct corresponding to the fd
int fd; // target file descriptor number
} __packed;
/* Compare RB tree keys */
static inline int ep_cmp_ffd(struct epoll_filefd *p1,
struct epoll_filefd *p2)
{
return (p1->file > p2->file ? +1:
(p1->file < p2->file ? -1 : p1->fd - p2->fd));
}
在进行完红黑树查找之后,如果发现是一个ADD操作,并且在树中没有找到对应的二叉树节点,就会调用ep_insert进行二叉树节点的增加。
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tf.file, fd, full_check);
} else
error = -EEXIST;
if (full_check)
clear_tfile_check_list();
break;
ep_insert
ep_insert首先判断当前监控的文件值是否超过了/proc/sys/fs/epoll/max_user_watches的预设最大值,如果超过了则直接返回错误。
user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches))
return -ENOSPC;
接下来是分配资源和初始化动作。
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
/* Item initialization follow here ... */
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
再接下来的事情非常重要,ep_insert会为加入的每个文件描述字设置回调函数。这个回调函数是通过函数ep_ptable_queue_proc来进行设置的。这个回调函数是干什么的呢?其实,对应的文件描述字上如果有事件发生,就会调用这个函数,比如套接字缓冲区有数据了,就会回调这个函数。这个函数就是ep_poll_callback。这里你会发现,原来内核设计也是充满了事件回调的原理。
/*
* This is the callback that is used to add our wait queue to the
* target file wakeup lists.
*/
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi>nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
if (epi->event.events & EPOLLEXCLUSIVE)
add_wait_queue_exclusive(whead, &pwq->wait);
else
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}
ep_poll_callback
ep_poll_callback函数的作用非常重要,它将内核事件真正地和epoll对象联系了起来。它又是怎么实现的呢?
首先,通过这个文件的wait_queue_entry_t对象找到对应的epitem对象,因为eppoll_entry对象里保存了wait_queue_entry_t,根据wait_queue_entry_t这个对象的地址就可以简单计算出eppoll_entry对象的地址,从而可以获得epitem对象的地址。这部分工作在ep_item_from_wait函数中完成。一旦获得epitem对象,就可以寻迹找到eventpoll实例。
/*
* This is the callback that is passed to the wait queue wakeup
* mechanism. It is called by the stored file descriptors when they
* have events to report.
*/
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;
接下来,进行一个加锁操作。
spin_lock_irqsave(&ep->lock, flags);
下面对发生的事件进行过滤,为什么需要过滤呢?为了性能考虑,ep_insert向对应监控文件注册的是所有的事件,而实际用户侧订阅的事件未必和内核事件对应。比如,用户向内核订阅了一个套接字的可读事件,在某个时刻套接字的可写事件发生时,并不需要向用户空间传递这个事件。
/*
* Check the events coming with the callback. At this stage, not
* every device reports the events in the "key" parameter of the
* callback. We need to be able to handle both cases here, hence the
* test for "key" != NULL before the event match test.
*/
if (key && !((unsigned long) key & epi->event.events))
goto out_unlock;
接下来,判断是否需要把该事件传递给用户空间。
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
if (epi->next == EP_UNACTIVE_PTR) {
epi->next = ep->ovflist;
ep->ovflist = epi;
if (epi->ws) {
/*
* Activate ep->ws since epi->ws may get
* deactivated at any time.
*/
__pm_stay_awake(ep->ws);
}
}
goto out_unlock;
}
如果需要,而且该事件对应的event_item不在eventpoll对应的已完成队列中,就把它放入该队列,以便将该事件传递给用户空间。
/* If this file is already in the ready list we exit soon */
if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake_rcu(epi);
}
我们知道,当我们调用epoll_wait的时候,调用进程被挂起,在内核看来调用进程陷入休眠。如果该epoll实例上对应描述字有事件发生,这个休眠进程应该被唤醒,以便及时处理事件。下面的代码就是起这个作用的,wake_up_locked函数唤醒当前eventpoll上的等待进程。
/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
if (waitqueue_active(&ep->wq)) {
if ((epi->event.events & EPOLLEXCLUSIVE) &&
!((unsigned long)key & POLLFREE)) {
switch ((unsigned long)key & EPOLLINOUT_BITS) {
case POLLIN:
if (epi->event.events & POLLIN)
ewake = 1;
break;
case POLLOUT:
if (epi->event.events & POLLOUT)
ewake = 1;
break;
case 0:
ewake = 1;
break;
}
}
wake_up_locked(&ep->wq);
}
查找epoll实例
epoll_wait函数首先进行一系列的检查,例如传入的maxevents应该大于0。
/* The maximum number of event must be greater than zero */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
/* Verify that the area passed by the user is writeable */
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event)))
return -EFAULT;
和前面介绍的epoll_ctl一样,通过epoll实例找到对应的匿名文件和描述字,并且进行检查和验证。
/* Get the "struct file *" for the eventpoll file */
f = fdget(epfd);
if (!f.file)
return -EBADF;
/*
* We have to check that the file structure underneath the fd
* the user passed to us _is_ an eventpoll file.
*/
error = -EINVAL;
if (!is_file_epoll(f.file))
goto error_fput;
还是通过读取epoll实例对应匿名文件的private_data得到eventpoll实例。
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = f.file->private_data;
接下来调用ep_poll来完成对应的事件收集并传递到用户空间。
/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
ep_poll
还记得 第22讲 里介绍epoll函数的时候,对应的timeout值可以是大于0,等于0和小于0么?这里ep_poll就分别对timeout不同值的场景进行了处理。如果大于0则产生了一个超时时间,如果等于0则立即检查是否有事件发生。
*/
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,int maxevents, long timeout)
{
int res = 0, eavail, timed_out = 0;
unsigned long flags;
u64 slack = 0;
wait_queue_entry_t wait;
ktime_t expires, *to = NULL;
if (timeout > 0) {
struct timespec64 end_time = ep_set_mstimeout(timeout);
slack = select_estimate_accuracy(&end_time);
to = &expires;
*to = timespec64_to_ktime(end_time);
} else if (timeout == 0) {
/*
* Avoid the unnecessary trip to the wait queue loop, if the
* caller specified a non blocking operation.
*/
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
goto check_events;
}
接下来尝试获得eventpoll上的锁:
spin_lock_irqsave(&ep->lock, flags);
获得这把锁之后,检查当前是否有事件发生,如果没有,就把当前进程加入到eventpoll的等待队列wq中,这样做的目的是当事件发生时,ep_poll_callback函数可以把该等待进程唤醒。
if (!ep_events_available(ep)) {
/*
* Busy poll timed out. Drop NAPI ID for now, we can add
* it back in when we have moved a socket with a valid NAPI
* ID onto the ready list.
*/
ep_reset_busy_poll_napi_id(ep);
/*
* We don't have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
* ep_poll_callback() when events will become available.
*/
init_waitqueue_entry(&wait, current);
__add_wait_queue_exclusive(&ep->wq, &wait);
紧接着是一个无限循环, 这个循环中通过调用schedule_hrtimeout_range,将当前进程陷入休眠,CPU时间被调度器调度给其他进程使用,当然,当前进程可能会被唤醒,唤醒的条件包括有以下四种:
- 当前进程超时;
- 当前进程收到一个signal信号;
- 某个描述字上有事件发生;
- 当前进程被CPU重新调度,进入for循环重新判断,如果没有满足前三个条件,就又重新进入休眠。
对应的1、2、3都会通过break跳出循环,直接返回。
//这个循环里,当前进程可能会被唤醒,唤醒的途径包括
//1.当前进程超时
//2.当前进行收到一个signal信号
//3.某个描述字上有事件发生
//对应的1.2.3都会通过break跳出循环
//第4个可能是当前进程被CPU重新调度,进入for循环的判断,如果没有满足1.2.3的条件,就又重新进入休眠
for (;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
set_current_state(TASK_INTERRUPTIBLE);
/*
* Always short-circuit for fatal signals to allow
* threads to make a timely exit without the chance of
* finding more events available and fetching
* repeatedly.
*/
if (fatal_signal_pending(current)) {
res = -EINTR;
break;
}
if (ep_events_available(ep) || timed_out)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
//通过调用schedule_hrtimeout_range,当前进程进入休眠,CPU时间被调度器调度给其他进程使用
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
}
如果进程从休眠中返回,则将当前进程从eventpoll的等待队列中删除,并且设置当前进程为TASK_RUNNING状态。
//从休眠中结束,将当前进程从wait队列中删除,设置状态为TASK_RUNNING,接下来进入check_events,来判断是否是有事件发生
__remove_wait_queue(&ep->wq, &wait);
__set_current_state(TASK_RUNNING);
最后,调用ep_send_events将事件拷贝到用户空间。
//ep_send_events将事件拷贝到用户空间
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;
return res;
ep_send_events
ep_send_events这个函数会将ep_send_events_proc作为回调函数并调用ep_scan_ready_list函数,ep_scan_ready_list函数调用ep_send_events_proc对每个已经就绪的事件循环处理。
ep_send_events_proc循环处理就绪事件时,会再次调用每个文件描述符的poll方法,以便确定确实有事件发生。为什么这样做呢?这是为了确定注册的事件在这个时刻还是有效的。
可以看到,尽管ep_send_events_proc已经尽可能的考虑周全,使得用户空间获得的事件通知都是真实有效的,但还是有一定的概率,当ep_send_events_proc再次调用文件上的poll函数之后,用户空间获得的事件通知已经不再有效,这可能是用户空间已经处理掉了,或者其他什么情形。还记得 第22讲 吗,在这种情况下,如果套接字不是非阻塞的,整个进程将会被阻塞,这也是为什么将非阻塞套接字配合epoll使用作为最佳实践的原因。
在进行简单的事件掩码校验之后,ep_send_events_proc将事件结构体拷贝到用户空间需要的数据结构中。这是通过__put_user方法完成的。
//这里对一个fd再次进行poll操作,以确认事件
revents = ep_item_poll(epi, &pt);
/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding "mtx", so no operations coming from userspace
* can change the item.
*/
if (revents) {
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
return eventcnt ? eventcnt : -EFAULT;
}
eventcnt++;
uevent++;
Level-triggered VS Edge-triggered
在 前面的 文章 里,我们一直都在强调level-triggered和edge-triggered之间的区别。
从实现角度来看其实非常简单,在ep_send_events_proc函数的最后,针对level-triggered情况,当前的epoll_item对象被重新加到eventpoll的就绪列表中,这样在下一次epoll_wait调用时,这些epoll_item对象就会被重新处理。
在前面我们提到,在最终拷贝到用户空间有效事件列表中之前,会调用对应文件的poll方法,以确定这个事件是不是依然有效。所以,如果用户空间程序已经处理掉该事件,就不会被再次通知;如果没有处理,意味着该事件依然有效,就会被再次通知。
//这里是Level-triggered的处理,可以看到,在Level-triggered的情况下,这个事件被重新加回到ready list里面
//这样,下一轮epoll_wait的时候,这个事件会被重新check
else if (!(epi->event.events & EPOLLET)) {
/*
* If this file has been added with Level
* Trigger mode, we need to insert back inside
* the ready list, so that the next call to
* epoll_wait() will check again the events
* availability. At this point, no one can insert
* into ep->rdllist besides us. The epoll_ctl()
* callers are locked out by
* ep_scan_ready_list() holding "mtx" and the
* poll callback will queue them in ep->ovflist.
*/
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
epoll VS poll/select
最后,我们从实现角度来说明一下为什么epoll的效率要远远高于poll/select。
首先,poll/select先将要监听的fd从用户空间拷贝到内核空间, 然后在内核空间里面进行处理之后,再拷贝给用户空间。这里就涉及到内核空间申请内存,释放内存等等过程,这在大量fd情况下,是非常耗时的。而epoll维护了一个红黑树,通过对这棵黑红树进行操作,可以避免大量的内存申请和释放的操作,而且查找速度非常快。
下面的代码就是poll/select在内核空间申请内存的展示。可以看到select 是先尝试申请栈上资源, 如果需要监听的fd比较多, 就会去申请堆空间的资源。
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
fd_set __user *exp, struct timespec64 *end_time)
{
fd_set_bits fds;
void *bits;
int ret, max_fds;
size_t size, alloc_size;
struct fdtable *fdt;
/* Allocate small arguments on the stack to save memory and be faster */
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];
ret = -EINVAL;
if (n < 0)
goto out_nofds;
/* max_fds can increase, so grab it once to avoid race */
rcu_read_lock();
fdt = files_fdtable(current->files);
max_fds = fdt->max_fds;
rcu_read_unlock();
if (n > max_fds)
n = max_fds;
/*
* We need 6 bitmaps (in/out/ex for both incoming and outgoing),
* since we used fdset we need to allocate memory in units of
* long-words.
*/
size = FDS_BYTES(n);
bits = stack_fds;
if (size > sizeof(stack_fds) / 6) {
/* Not enough space in on-stack array; must use kmalloc */
ret = -ENOMEM;
if (size > (SIZE_MAX / 6))
goto out_nofds;
alloc_size = 6 * size;
bits = kvmalloc(alloc_size, GFP_KERNEL);
if (!bits)
goto out_nofds;
}
fds.in = bits;
fds.out = bits + size;
fds.ex = bits + 2*size;
fds.res_in = bits + 3*size;
fds.res_out = bits + 4*size;
fds.res_ex = bits + 5*size;
...
第二,select/poll从休眠中被唤醒时,如果监听多个fd,只要其中有一个fd有事件发生,内核就会遍历内部的list去检查到底是哪一个事件到达,并没有像epoll一样, 通过fd直接关联eventpoll对象,快速地把fd直接加入到eventpoll的就绪列表中。
static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time)
{
...
retval = 0;
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
bool can_busy_loop = false;
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, mask, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
if (all_bits == 0) {
i += BITS_PER_LONG;
continue;
}
if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
to, slack))
timed_out = 1;
...
总结
epoll维护了一棵红黑树来跟踪所有待检测的文件描述字,红黑树的使用减少了内核和用户空间大量的数据拷贝和内存分配,大大提高了性能。
同时,epoll维护了一个链表来记录就绪事件,内核在每个文件有事件发生时将自己登记到这个就绪事件列表中,通过内核自身的文件file-eventpoll之间的回调和唤醒机制,减少了对内核描述字的遍历,大大加速了事件通知和检测的效率,这也为level-triggered和edge-triggered的实现带来了便利。