今儿我们就从源码入手,来帮助大家简单理解一下 epoll 的实现原理,并在后边分析一下,大家都说 epoll 性能好,那到底是好在哪里。
epoll 简介
1、epoll 的简单使用
我们先来看下 epoll 的简单使用。
首先来看下不用 epoll 的时候,我们可能会怎样去创建一个 socket 链接的伪代码:
// 创建一个 socket
socket_fd = socket(AF_INET,SOCK_STREAM,0);
// 给 socket 绑定本地端口和地址
local_addr.sin_port = htons(PORT);
local_addr.sin_addr.s_addr = INADDR_ANY;
ret = bind(socket_fd,(struct sockaddr*)&local_addr,sizeof(struct sockaddr_in));
// 监听客户端发来的链接
ret = listen(socket_fd,backlog);
// 死循环
for(;;){
// 当用户调用了 connect 后服务端会触发 accept
accept_fd = accept( socket_fd, (struct sockaddr *)&remote_addr, &addr_len );
for(;;){
// 从线程池里捞一条线程然后把这个 accept 交给这条线程
// 然后线程中去做 recv()
get_thread_from_pool(accept_fd)
}
}
不同语言可能写法都不太一样,但是大概流程都是先创建个 socket,然后给 socket 绑定上本地端口和 ip,以便客户端能通过这俩信息找到自己,之后监听这个 socket,再然后死循环中用 accept 来接受用户的 connect,接收到之后,把链接的 fd 扔给一条新的线程中去做 read 之类的操作。
我们再来简单看下用 epoll 的时候大概会怎么写:
int main() {
// 创建 socket
sockfd = socket(AF_INET, SOCK_STREAM, 0);
// 给 socket 绑定地址和 port 并监听
myaddr.sin_port = htons(PORT);
myaddr.sin_addr.s_addr = INADDR_ANY;
bind(sockfd, (const struct sockaddr *)&myaddr, sizeof(myaddr))
listen(sockfd)
// 创建 epoll
int efd = epoll_create(1);
// 创建 epoll 的事件
struct epoll_event evt = {
.events = EPOLLIN,
.data.fd = sockfd,
};
// 把 socket 交给 epoll 做托管
epoll_ctl(efd, EPOLL_CTL_ADD, sockfd, &evt)
struct epoll_event events[MAX];
while (1) {
// 触发 epoll 的等待, 等用户的 connect 以及 send
int num = epoll_wait(efd, events);
for (i = 0; i < num; i++) {
if (events[i].events & EPOLLIN) {
// 如果是 socketfd 收到了 connect
if (sockfd == events[i].data.fd) {
// 就把这条链接的 fd 也放到 epoll 中
int cn_fd = accept(sockfd, NULL, NULL);
struct epoll_event ac_evt = {
.events = EPOLLIN,
.data.fd = cn_fd,
};
epoll_ctl(efd, EPOLL_CTL_ADD, cn_fd, &ac_evt);
} else {
// 如果是收到了用户的 send, 那就从线程池里捞出一条线程
// 然后里头再去做 read 之类的操作
get_thread_from_pool(events[i].data.fd);
}
}
}
}
}
上边的代码简单来讲也是先创建 socket,然后创建 epoll,之后将 socket 交给 epoll 管理,随后启动死循环,当用户 connect 了之后再把这个 accept 的 fd 同样托管给 epoll,这样当用户发消息过来之后就会从线程池中捞一条线程,然后用这条线程去做 read 之类的操作。
用以及不用 epoll 大概就是上边这两种情况,这里都是伪代码,具体一点的代码可以很容易搜到,大家如果想自己试的话可以去搜一搜,这里就简单带过了。
2、epoll 的系统调用
epoll 主要有仨系统调用:
- epoll_create: 创建一个 epoll 对象
- epoll_ctl: 把要管理的对象添加到 epoll 中
- epoll_wait: hang 住当前线程等待被托管的东西里有 IO 发生
epoll 实现原理
epoll 的实现原理可能会有点绕,如果不想看中间那大坨源代码的话,大家可以直接跳到后边 “几个系统调用总结” 这部分来看最后的总结。
1、epoll 是文件系统
首先 epoll 深得 unix 设计哲学的精髓,他也和 socket 一样,是个文件系统,它的主要系统调用实现在内核源码的 “fs/eventpoll.c” 文件中。
在之前的文章中介绍过 Linux 的文件系统以及 sockfs,并且当时提到文件系统有基于磁盘的,也有基于内存的。当时介绍的 sockfs 就是基于内存的文件系统。很明显,这里的 epoll 文件系统也是基于内存的一种文件系统。
我们在之前的文章中提到,对于基于磁盘的文件系统比如 ext4 等他们都在内存中有自己的 inode 数据结构,这个 inode 数据结构上保存了很多对当前文件系统的操作方法以及属性。然后用户态在使用的时候,大概就是在线程的 task_struct 结构体上找到 files 属性中的 fd_array 或者 fd_table,然后通过 fd 找到对应的 file 结构体,之后通过 file 结构体,就能找到对应的 inode 然后做一些文件相关的操作。
而对于类似 sockfs 或者 epoll 这种基于内存的文件系统来讲,他们虽然也有 inode 属性,但对他们来讲,这个 inode 是一种 “假的” inode,也就是说对于 epoll 来讲,它的 inode 作用不大,而真正有用的,是挂载在 file 结构体上的 private_data 属性,这点它和 socket 一样。
到这儿为止,如果感觉不是很清晰的话,可以去看下之前介绍 sockfs 的文章,或者也可以简单地记,就是:
- epoll 和 socket 一样也是一种文件系统
- 当用户调用了 epoll_create 之后会返回 epoll 的 fd
- 通过这个 fd,可以在 task_struct 的 files 上找到对应的 epoll 的 file 结构体
- 在这个 file 结构体上可以拿到一个 private_data 属性,这个 private_data 属性的值,就是 epoll 内核中的数据结构。至于这个结构是什么东西,咱们后边再说。
2、epoll_create
首先我们来分析一下想使用 epoll 的话,一定要走的第一个系统调用 “epoll_create”。
上图是源码中的实现,我们来简单看下:
static int do_epoll_create(int flags) {
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
// 创建了一个 eventpoll 结构体
error = ep_alloc(&ep);
// 生成文件描述符
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
// 创建 epoll 对应的 file 结构体
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC));
ep->file = file;
// 给它绑上 fd
fd_install(fd, file);
return fd;
}
上边是把这个系统调用最外层的精华摘了出来,主要做的事儿,就是创建了一个 eventpoll 结构体,咱们上一小节说的通过 fd 找到 file 然后找到的那个 private_data 属性,其实就是这个 eventpoll 结构体。
通过上边源码可以看到,这个 eventpoll 结构体作为 priv 参数交给了 “file->private_data” 方法。另外在源代码中也可以看到,epoll 对应的这个 file 结构体,是用一个叫 “alloc_file_pseudo” 的方法创建的,其中这个 “pseudo” 是 “假的” 的意思,这也表明了对于 epoll 这种基于内存的文件系统,它的 file 结构体相比基于磁盘的文件系统没有那么 “沉”。
接下来我们回到上边创建完了 eventpoll 结构体之后,epoll_create 系统调用中会获取一个未使用的文件描述符,然后给 epoll 创建一个 file 结构体,并把这个 file 结构体和 fd 做一个 “fd_install”,也就是给绑定一下子,这样通过这个 fd 就能在当前线程的 task_struct 上找到对应的这个 eventpoll 数据结构了。
上边我们反复提到 eventpoll 这个结构体,从 epoll_create 系统调用的源码也能看出,这个系统调用主要就是创建出了这个一个结构体,并且能让我们通过 fd 找到他,那他到底是个啥呢?我们来下源码:
这个 eventpoll 结构体上有很多属性,其中最重要的,我们只需要记住三个就好:
- wq: 一个存放等待事件的队列
- rdllist: 一个存放就绪事件的队列
- rbr: 一颗红黑树
至于这仨分别是干啥的,一会儿在后边的文章中就能看到了。
这里简单总结一下,使用 epoll 的第一步!调用 epoll_create 方法,该方法做的事情就是创建了一个 eventpoll 结构体,并且能让用户态通过 fd 找到这个 eventpoll 结构体。这个结构体上重点有仨属性,一个用来存放等待事件的队列,一个用来存放就绪事件的队列,以及一颗红黑树。
2、epoll_ctl
接下来我们来看使用 epoll 的第二步,使用 epoll_ctl 系统调用,将要托管的 socket fd 交给 epoll 托管。代码大概长这样:
epoll_ctl(efd, EPOLL_CTL_ADD, sockfd, &evt)
这一步就是将 socket 交给 epoll 管理,我们来简单介绍下它里头做了什么事儿,这里可能有些逻辑会比较绕,大家可以自己再去看看源码加深一下理解:
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, struct epoll_event __user *, event) {
struct eventpoll *ep;
// 根据 epoll 的 fd 找到对应的 eventpoll 的 file 结构体
f = fdget(epfd);
// 根据 socket 的 fd 找到对应的 socket 的 file 结构体
tf = fdget(fd);
// 检查是否支持 poll
if (!file_can_poll(tf.file))
goto error_tgt_fput;
// 找到对应的 eventpoll 结构体
ep = f.file->private_data;
switch (op) {
// 添加一个 socket 到 epoll 中
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= EPOLLERR | EPOLLHUP;
error = ep_insert(ep, &epds, tf.file, fd, full_check);
}
break;
case EPOLL_CTL_DEL:
case EPOLL_CTL_MOD:
}
}
上边是 epoll_ctl 这个系统调用的主要代码,里头做的事情乍一看也很简单:
- 根据 epoll 的 fd 找到对应的 file 结构体,这个结构体上能找到 eventpoll 结构体
- 根据 socket 的 fd 找到对应的 file 结构体,这个结构体上能找到 socket 结构体
- 调用了 ep_insert 方法,将 socket 插入到 eventpoll 结构体中
下面我们来看看 “ep_insert” 这个方法做了啥:
static int ep_insert(struct eventpoll *ep, const struct epoll_event *event, struct file *tfile, int fd, int full_check)
{
// 初始化一个 epitem 数据结构
struct epitem *epi;
// 初始化一个等待队列,但它其实是个 struct 结构体
// 上边只有一个 poll_table 结构体和 epitem 结构体
struct ep_pqueue epq;
// 初始化 epitem 结构上的 pwqlist 属性
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
// 这里只做了 ffd->file = file 以及 ffd->fd = fd
ep_set_ffd(&epi->ffd, tfile, fd);
// 给等待队列的 epitem 赋值
epq.epi = epi;
// 给等待队列的 poll_table 赋值
// 赋的值可以简单地认为就是后边这个叫做 “ep_ptable_queue_proc” 的函数
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
// 这里会调用上边那行的 “ep_ptable_queue_proc” 方法
// 作用可以简单理解成给 epi 的 pwqlist 这条链表上
// 添加了一个叫做 “ep_poll_callback” 的回调函数
revents = ep_item_poll(epi, &epq.pt, 1);
// 把这个 epitem 插入到 eventpoll 的红黑树里
ep_rbtree_insert(ep, epi);
}
这个 ep_insert 方法中,从宏观上来看,主要做的事情就是创建了一个红黑树的节点,这个节点上保存了用户传进来的 socket 相关的信息,然后主要还调用了一个 ep_item_poll 用来初始化等待队列。
这里最最最主要的操作其实就是这个 “ep_item_poll” 方法了,这个方法主要是往 epi 的 pwqlist 这条链表上挂了个回调函数名字叫 “ep_poll_callback”,那么这个 epi 的 “pwqlist” 又是谁,是干嘛的呢?这里我们直接揭秘,其实这个 “pwqlist” 就是用户传进来的那个 socket 身上的 “等待队列”。我们来详细看下源码,这里会比较绕,我尽量说得简单点:
static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt, int depth) {
pt->_key = epi->event.events;
if (!is_file_epoll(epi->ffd.file))
return vfs_poll(epi->ffd.file, pt) & epi->event.events;
}
static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt) {
return file->f_op->poll(file, pt);
}
虽然我们上边说这块儿会比较绕,但实际上这个 “ep_item_poll” 的源码还算是比较短的,我们把最重要的摘出来其实就这么几行,可以看到它里头调用了 vfs_poll 方法,vfs_poll 方法中又去调用了 file -> f_op -> poll 方法。
从这里我们就可以看出,如果你的文件系统实现了 poll 方法的话,其实理论上是都可以被 epoll 来托管的。那么这里这个 poll 方法是谁呢?这里不卖关子直接说,其实这个 file -> f_op -> poll 方法就是 tcp 协议自己实现的 poll 方法,也就是 “tcp_poll” 方法。
这里简单解释一下这个 tcp_poll 方法是怎么来的:首先大家都知道 socket 这个东西,但其实 socket 之下还有更重要的一个叫做 “sock” 的结构。对于这个 socket 和 sock 应该怎么理解呢?其实可以把 socket 理解成 “协议簇”,把 sock 理解为真正的 “协议”,socket 是用户层的概念,而 sock 则是真的要和一种底层的协议做绑定的,比如 tcp 协议或者 udp 协议。然后不同的协议实现的什么 read 方法,send 方法,poll 方法等,就会被挂载到这个 sock 结构体上,也就是说,当用户在用户侧调用了一个什么 send 方法或者 recv 方法啥的,真正的调用逻辑是 “socket -> sock -> ops -> tcp_recv(或者 udp_recv)”。所以上边的 ep_item_poll 方法里头调用的 poll 方法,就是 socket -> sock -> ops -> tcp_poll 方法。
也就是说,这里可以简单地理解一下,当用户态调用了 “epoll_ctl” 并把一个 socket 传进来的时候,这个系统调用会调用 socket 下层的 poll 接口,而实现了这个 poll 接口的,就是下层真正的协议,比如 tcp 协议,此时就会调用 tcp 协议自己实现的 tcp_poll 方法。
好了回过头继续看这个 tcp_poll 方法,注意 “ep_item_poll” 在调用这个 tcp_poll 方法的时候,把一个 “poll_table” 类型的属性作为参数传给了 tcp_poll,这个 poll_table 是谁呢,我们暂时回头去看下 “ep_insert” 方法中的那个 “init_poll_funcptr” 方法:
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
可以看到 init_poll_funcptr 接收了一个 “ep_ptable_queue_proc” 方法,并把这个方法放到 “poll_table” 这个结构体的 “_qproc” 属性上,这里大家先强行记住这个 “_qproc”,记住这个 poll_table 结构体上有个 _qproc 属性,并且指向了一个叫 “ep_ptable_queue_proc” 的函数。
然后我们往下看,上边说到 “ep_item_poll” 会调用 tcp 协议实现的 poll 方法并把这个 poll_table 作为参数传进去,那我们来看看 tcp_poll 中实现了啥:
我们顺着 tcp_poll 的这条调用链看下去,最终在 poll_wait 中看到了一个眼熟的东西,诶!就是上边让大家记住的那个 “_qproc” 属性,它指向了 “ep_ptable_queue_proc” 方法。
嗷!到这儿我们能反应过来,在 epoll_ctl 这个系统调用中,调用了底层协议的 poll 方法,并且把 epoll 那层的一个函数作为参数传给了底层协议的 poll 方法,然后底层协议的 poll 方法又会调用这个函数。
是不是觉得有点绕了,还有更绕的。
我们来看上图那个 poll_wait 函数,你看它调用 _qproc 方法时候传的参数是个谁?其中是不是有个叫做 “wait_address” 的东西,然后您再往上看上一张图的 “sock_poll_wait” 方法,在调用这个 “poll_wait” 方法时,传进来的这个 “wait_address” 是谁呢?没错,正是 socket 的 wq.wait,也就是 socket 上的一个等待队列。
好,到这儿我们在梳理一下流程,当用户调用 “epoll_ctl” 并传进来一个 socketfd 的时候,epoll ctl 内部会调用这个 socket 底层的协议实现的 poll 方法,并把自己的一个 poll_table 属性传进去,然后在底层协议比如 tcp 协议实现的 poll 方法中,又会调用上层的 epoll_ctl 传进来的这个 poll_table 上的 _qproc 方法,并把自己这个 socket 身上的等待队列作为参数传给这个 _qproc 方法,而这个 _qproc 方法指向的是 “ep_ptable_queue_proc” 这个函数。所以接下来我们来看 “ep_ptable_queue_proc” 方法:
static void ep_ptable_queue_proc(
struct file *file,
wait_queue_head_t *whead,
poll_table *pt
) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
add_wait_queue(whead, &pwq->wait);
}
init_waitqueue_func_entry(struct wait_queue_entry *wq_entry, wait_queue_func_t func)
{
wq_entry->flags = 0;
wq_entry->private = NULL;
wq_entry->func = func;
}
我们捡主要的看,这个函数中,最重要的两个步骤就是首先是调用 “init_waitqueue_func_entry” 方法,这个方法很简单,直接贴在上边了,就是把 “ep_poll_callback” 这个方法给挂到 pwp->wait 上边。接下来调用 “add_wait_queue”,把这个挂载了 “ep_poll_callback” 方法的 pwp->wait 结构给挂载到 whead 这个队列上,那这个 whead 是谁呢,你一定能想到,就是上边 tcp_poll 在调用这个 “ep_ptable_queue_proc” 方法时传进来的 socket 自己身上的 wq 等待队列。
到这儿,我们总结一下 epoll_ctl 都做了啥:
- 在 epoll_ctl 中调用了传进来的那个 socket 底层协议的 poll 方法,比如底层协议如果是 tcp 的话,那这个方法就是 tcp_poll
- epoll_ctl 在调用 tcp_poll 时,把自己这边的一个回调函数传给了 tcp_poll
- tcp_poll 中又会调用上层 epoll_ctl 传给他的这个回调函数,并且 tcp_poll 把自己的 socket 身上的等待队列作为参数传给这个 epoll_ctl 传下来的回调函数
- 这个 epoll_ctl 中的会调用拿到了底层协议自己的 wq 等待队列后,往这个等待队列中推入了一个数据结构,这个数据结构中只有一个回调函数,叫 “eo_poll_callback”
- 最后把这个 socket 插到 epoll 内部的红黑树上
好了到这儿我们就把 epoll_ctl 主要做的事儿都说完了。可以发现这套流程如果要是自己一点点看的话,确实会比较绕,因为它里边相当于是上层的 epoll 和下层的协议都是可以替换的,只要下层协议实现了 poll 方法,然后上层能把自己的回调注入进入,之后下层的 poll 方法再把自己的等待队列注入给上层的回调函数,这就 ok 了,有一种双向依赖注入的感觉。还挺(má)妙(fán)的是吧。
3、epoll_wait
说完了 epoll_create 和 epoll_ctl 我们来看是用 epoll 的最后一个重要的系统调用 “epoll_wait”。
epoll_wait 主要调用的是 “do_epoll_wait” 中的 “ep_poll” 方法,我们来看一下:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
// 先判断 eventpoll 上的就绪队列是不是有东西
// 有的话直接吐给用户
if (!ep_events_available(ep))
ep_busy_loop(ep, timed_out);
eavail = ep_events_available(ep);
if (eavail)
goto send_events;
// 使用 current 初始化一个等待项
init_waitqueue_entry(&wait, current);
// 把等待项给干到 eventpoll 结构体的 wq 队列上
__add_wait_queue_exclusive(&ep->wq, &wait);
for (;;) {
// hang 住当前线程
set_current_state(TASK_INTERRUPTIBLE);
}
}
对于这个 “ep_poll” 整体上来看,做的事情比较直观,主要就是:
- 先看看 epoll 里头的就绪队列是不是已经有东西了。还记得最开始我们介绍 eventpoll 时候说它里头有三个重要的东西,一个是 “就绪队列”,一个是 “等待队列”,还有一颗 “红黑树”,此时看的就是这个 eventpoll 中的 “就绪队列”。
- 就绪队列里没东西的话会创建一个所谓的 “等待项”,这个是啥呢,后边再说。
- 创建好等待项之后把这个等待项给挂载到 eventpoll 的 “等待队列” 也就是那个 wq 上。
- 将当前线程从操作系统的调度队列中拿出来,hang 住当前线程。(current 总是指向当前正在运行的线程,内部是通过汇编+寄存器实现的,这里可以当成一个全局的环境变量)
所以简单来说 epoll_wait 做的最主要的事儿就是往内部的 “等待队列” 中插入了一个 “等待项” 并且让当前线程睡觉。接下来我们来看上边比较重要的一个 “等待项” 是啥
简单理解,所谓等待项就是一个结构体,上边会放一个 private 属性,该属性指向 current 也就是当前线程的 task_struct 结构体,还有个 func 属性指向一个名叫 “default_wake_function” 的回调函数。
然后这个等待项,就会被插入到 eventpoll 的 wq “等待队列” 上
到这儿为止,我们就把 epoll_wait 主要做的事情也说完了。
4、几个系统调用总结
接下来我们简单总结一下,epoll_create 和 epoll_ctl 以及 epoll_wait 都大概做了哪些事情:
首先是 epoll_create,它是使用 epoll 的第一步,它里边主要是创建了三个数据结构,一个 “等待队列”,一个 “就绪队列”,以及一颗 “红黑树”。
如果用伪代码表示的话,那么当你调用了 epoll_create 之后,此时通过这个系统调用返回的 fd,你能拿到这么一个结构体:
ep = {
等待队列 = [],
就绪队列 = [],
红黑树 = [],
}
然后是 epoll_ctl,它允许你将实现了 poll 方法的文件系统作为参数交给 epoll 管理,epoll_ctl 内部会调用真实的底层协议实现的 poll 方法,并把 epoll 这一层的一个回调函数作为参数传给 poll 方法,然后底层协议的 poll 方法中会调用 epoll 传进来的那个回调函数,并且协议会把自己身上的等待队列作为参数交给 epoll 的那个回调函数来处理。而这个回调函数中则会创建一个等待项,这个等待项上有个回调函数叫 “ep_poll_callback”,并且把这个等待项给塞到底层协议传过来的等待队列上。
如果用伪代码表示的话,那么当你调用了 epoll_ctl 并把一个 socket 交给它管理之后,此时 fd 对应的结构体就变成了这样,它的红黑树中会多一个节点:
ep = {
等待队列 = [],
就绪队列 = [],
红黑树 = [
socket1 = {
等待队列 = [{ callback: ep_poll_callback }]
}
],
}
最后是 epoll_wait,它会 hang 住当前线程,以等待被托管的 fd 身上有 IO 事件发生。它内部会创建一个等待项,注意这个等待项和上边 epoll_ctl 中的那个等待项不是一个东西,上边 epoll_ctl 的等待项是塞给了 socket 的等待队列,而且里头只有一个叫 “ep_poll_callback” 的回调函数,而这里的 epoll_wait 的等待项是真的塞给了 epoll 自己的 eventpoll 上的等待队列,并且它上边除了有个一个叫做 “default_wake_function” 的回调函数,同时还保存了 current 也就是当前线程对应的 task_struct 结构体。都弄完了之后就会出让 cpu 让当前线程睡觉觉。
如果用伪代码表示的话,那么当你调用了 epoll_wait 之后,此时的 fd 能找到的结构体就变成了这样,这个 epoll 自己的等待队列上会多一个等待项:
ep = {
等待队列 = [{ callback: default_wake_function, private: current }],
就绪队列 = [],
红黑树 = [
socket1 = {
等待队列 = [{ callback: ep_poll_callback }]
}
],
}
5、 当来消息了
当用户态执行完了上边仨系统调用之后,这条线程就 hang 在这儿了,知道有客户端发消息过来。那么接下来我们看看当用户发消息过来之后会发生什么。
太具体的网卡收包的过程咱们就不说了先,大概过程总之就是网卡收到数据之后触发硬中断以及软中断,软中断从缓冲区中把收到的数据处理成 sk_buffer 这个数据结构,然后从网卡驱动也就是链路层这一层开始往上送到网络层再送到传输层,在网络层将 sk_buffer 送到传输层之前,它要有一步是根据 sk_buffer 中的协议,来找到要使用哪个传输层协议:
在 tcp 对应的 tcp_v4_rc 方法中,就会根据 ip 以及 port 去查找对应的 socket:
int tcp_v4_rcv(struct sk_buff *skb) {
struct sock *sk;
sk = __inet_lookup_skb(&tcp_hashinfo, skb, __tcp_hdrlen(th), th->source,
th->dest, sdif, &refcounted);
ret = tcp_v4_do_rcv(sk, skb);
}
int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb) {
tcp_rcv_established(sk, skb);
}
void tcp_rcv_established(struct sock *sk, struct sk_buff *skb) {
tcp_data_ready(sk);
}
void tcp_data_ready(struct sock *sk) {
sk->sk_data_ready(sk);
}
我们顺着 tcp_v4_rcv 这条调用链看下去,最后会发现最终会调用到一个叫 “sk->sk_data_ready” 的方法,这个方法从名字上看就能看出来,它的作用是当 “数据准备好了” 时候调用的,那么这个 sk_data_ready 是谁呢?其实这个方法是在当用户创建 socket 以及内部的 sock 结构时候被挂到上边去的,由于创建 socket 的过程也比较繁琐,这里我们就不再细说了,在之前的文章中我们有过介绍。我们记住一个结论,就是这个 sk_data_ready 属性,指向的就是一个叫做 “sock_def_readable” 的方法:
static void sock_def_readable(struct sock *sk)
{
struct socket_wq *wq;
wq = rcu_dereference(sk->sk_wq);
wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI |
EPOLLRDNORM | EPOLLRDBAND);
}
该方法中通过 “wake_up_interruptible_sync_poll” 方法执行等待队列上的回调函数。不过这里有个迷惑人的地方,就是它的名字虽然带有 “wake_up” 唤醒字样,但实际上这里其实并一定会唤醒当前线程。如果你在上层是对 socket 做了类似 recv 之类的操作的话,那确实这里是会做唤醒,但是在 epoll 的情况下,这里并不会直接唤醒线程,为啥呢?还记得上边我们介绍 epoll 的三个相关系统调用,当你把 epoll_create、epoll_ctl、epoll_wait 仨东西都调用完之后,通过 fd 能拿到的是啥玩意儿么,来看下边回顾下:
ep = {
等待队列 = [{ callback: default_wake_function, private: current }],
就绪队列 = [],
红黑树 = [
socket1 = {
等待队列 = [{ callback: ep_poll_callback }]
}
],
}
就是这么个玩意儿,它里头有两个 “等待队列”,其中一个在 socket 上,另外一个在 epoll 上。这里这个 “sock_def_readable” 方法中的 “wake_up_interruptible_sync_poll” 其实是会去 socket 上的等待队列中去拿那个等待项,这个等待项里只有一个 callback 指向了 ep_poll_callback 回调函数。其实对于非 epoll 的情况下,如果上层调用的 recv 的话, 这个 socket 的等待项中,确实是会还有个 private 指向 current 的,不过这里我们是 epoll 的场景,对于其他场景大家可以自行研究,如果把 epoll 这个场景整明白了,其他场景其实也大同小异。
总之呢,这里会调用这个 socket 的等待队列中的 ep_poll_callback 方法:
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key) {
// 先找到这个 socket 对应的红黑树上的那个节点
struct epitem *epi = ep_item_from_wait(wait);
// 再找到管理着这个 socket 的那个 eventpoll 结构体
struct eventpoll *ep = epi->ep;
// 把这个 socket 对应的红黑树的那个节点给添加到 eventpoll 的就绪队列中
list_add_tail_lockless(&epi->rdllink, &ep->rdllist)
// 看 eventpoll 的等待队列中是否有等待项, 然后尝试唤醒
if (waitqueue_active(&ep->wq)) {
wake_up(&ep->wq);
}
}
也就是说,当托管给 epoll 的某个 socket 上接收到了消息之后,tcp 的协议栈那层会主动触发一个唤醒用的 callback,这个 callback 是 “ep_poll_callback”,然后这个 “ep_poll_callback” 中又会找到红黑树上对应的节点,并把这个节点放到 epoll 内部的 “就绪队列中”,此时的伪代码可表示为:
ep = {
等待队列 = [{ callback: default_wake_function, private: current }],
就绪队列 = [ socket1 ],
红黑树 = [],
}
简单来讲就是当某个 socket 收到消息后,这个 socket 就不在红黑树里呆着了,会被放到 epoll 的就绪队列中。之后触发 “wake_up” 方法,该方法就会去 epoll 自己的等待队列上去看是否有等待项,有的话触发它的 callback,这里如上伪代码表示,就是触发了 “default_wake_function” 方法:
里边触发了一个 try_to_wake_up,我们注意看这个函数的参数是谁,是一个叫 “curr->private” 的东西,这个是谁呢?诶!就是上边伪代码中 epoll 的等待队列中的等待项里的 private 对应的那个 current,也就是之前调用了 epoll_wait 的那条线程对应的 task_struct。
换句话说,当调用了 try_to_wake_up(curr -> private) 之后,这条被 hang 住的线程,就会被重新加入到可运行的任务队列中,操作系统会在适当的时机继续执行它。
那么重新回到哪儿执行呢?还记得我们是在哪里 hang 住当前线程的么?是在调用了 epoll_wait 时,内部执行了一个叫做 “ep_poll” 的方法里边 hang 住的,忘了的话可以往上翻一番看一看那个 “ep_poll” 方法。所以继续执行的话,就可以执行到 “ep_send_events”,也就是会把当前就绪队列中的东西返回给用户态,最后就是用户态拿到咔咔用就行了~
到这儿,我们总结一下当数据包来了之后会发生了:
- 网卡收到包后一路往上送,送到 tcp 那层后
- tcp 那层会根据 ip 和 port 找到对应的 socket
- 触发 socket 上的唤醒函数
- 该函数主要是从 socket 的等待队列中获取等待项,并触发其中的回调函数
- 这个回调函数中会找到这个 socket 对应的红黑树节点,并把这个节点加入到 epoll 自己的 “就绪队列” 中
- 最后查看 epoll 自己的 “等待队列” 中,是否有等待项,有的话触发其中的回调函数
- 这个回调函数会拿到之前保存的 private 属性,也就是 task_struct 进行线程唤醒
- 唤醒后的线程从之前 hang 住的地方重新开始执行,会把 epoll “就绪队列” 中的都吐给用户态去使用
epoll 的性能高在哪儿?
到这里我们终于说完了 epoll 的基本实现原理,现在我们可以回过头来看一看,都说 epoll 性能高,那到底高在哪儿呢?
我们首先来看当不使用 epoll 的时候,我们可能会这么用 socket:
listenfd = socket(xxxx)
for {
conn = accpet(listenfd)
// 开个新线程或者从线程池里捞一条线程去处理 conn
// 这条线程里去 read,write
start_new_process(conn)
}
我们会先死循环中等待客户端的链接,每来一个链接,就开启一条新线程或者从池子里捞,用这条线程去处理 conn。
当我们使用 epoll 的时候,我们可能会这么用:
listenfd = socket(xxxx)
epoll_ctl(listenfd)
for {
nums = epoll_wait(&events)
for (i = 0; i < nums; i++) {
if (events[i].data.fd == listenfd) {
connfd = accpet(listenfd)
epoll_ctl(connfd)
} else {
connfd = ep[i].data.fd
// 开个新线程或者从线程池里捞一条线程去处理 conn
// 这条线程里去 read, write
start_new_process(connfd)
}
}
}
可以看到里头其实也是会频繁的创建新线程或者从池子里捞一条线程出来用。乍一看之下,感觉用不用 epoll 好像没啥差别。但是实际上,我们可以细想一下,如果用第一种方式,我们将 accept 的 fd 交给一条新的线程之后,在其内部我们一般会怎么做呢?一般可能就是:
function new_thread(acceptfd) {
while(true) {
res = recv(acceptfd);
}
}
我们在新的线程中处理每个链接时,大概率还是会用个死循环然后里头不停地去 hang 住线程知道有用户发请求过来。那么此时这条线程就卡死在这儿了。那么如果这条线程是从线程池中捞出来的话,这条线程就暂时回不去池子里了,相当于我们可用的线程资源就少一个。
但是对于 epoll 的场景来讲,epoll 是一定能保证当前用户拿到的这个 fd 中,确定一定以及肯定是有事件发生了,所以我们即使会创建新的线程或者从池子里捞,也可以马上就让这条新的线程去对我们拿到的 fd 做处理,就不用再 hang 住这条线程了。也就是说我们可以高效地利用每一条线程。这就是 epoll 高性能的原因。
如果用 epoll 托管 epoll 会怎么样?
回到我们的标题,我们在上边的文章中说过,当你的文件系统实现了 poll 方法之后,就可以使用 epoll 来托管,我们也说过 epoll 自己就是一种文件系统,那么我们来看看 epoll 这个文件系统它能做哪些操作:
能看到它里头其实也实现了 poll 方法,所以理论上来说我们就可以用 epoll 去托管 epoll。对于这个 “ep_eventpoll_poll” 方法,里面主要调用了一个 “poll_wait” 方法:
而对于 “poll_wait” 方法,它主要是调用了一个 “_qproc” 方法。怎么样这个方法是不是眼熟,这个就和我们在上边介绍用 epoll 管理 socket 时一样,epoll_ctl 会调用 socket 的 poll 方法,然后这个 poll 方法中又会调用上层 epoll 传过来的那个回调函数。
后边的事情大家就可以尝试自己去分析分析了,这里因为过程和 socket 是差不多的,我就不再一点点分析了,我们可以直接用伪代码来表示,如果用 epoll 托管 epoll,最后的数据结构体的样子,大概如下:
ep2 = {
等待队列 = [{ private: current, callback: default_wake_function }],
就绪队列 = [],
红黑树 = [
socket2 = {
等待队列 = [{ private: null, callback: ep_poll_callback }]
},
ep1 = {
等待队列 = [{ callback: ep_poll_callback }],
就绪队列 = [],
红黑树 = [
socket1 = {
等待队列 = [{ private: null, callback: ep_poll_callback }]
}
],
}
],
}
简单来讲,就是内部的 epoll 的等待队列中的等待项,其实回调函数和 socket 的等待项中一样,也是 “ep_poll_callback” 方法,只有外层的 epoll 的等待项中才会保存当前线程的 current。
也就是说!如果我们用 epoll 去管理一个 epoll 会发生什么呢!
答案是其实啥也不会发生,和正常一样,当外层的 epoll 有了就绪事件之后,用户侧拿到的 fd 除了是 socket 的 fd,还有可能是个内部 epoll 的 fd,这个 epoll 如果想从它上边获取到内部 socket 的消息,我们还是需要对内部的这个 epoll 做正常的 epoll_wait 等操作。我这里有个简单的小 demo,大家感兴趣的话可以自己尝试一下玩一玩:
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define PORT1 13190
#define PORT2 13191
#define MAX 1023
int set_fcntl(int rws)
{
int flags = fcntl(rws, F_GETFD);
if (flags < 0)
{
perror("get fcntl errnor");
return -1;
}
flags |= O_NONBLOCK;
if (fcntl(rws, F_SETFD, flags) < 0)
{
perror("set fcntl errnor");
return -1;
}
return 0;
}
int main() {
pid_t pid = getppid();
printf("本条进程的 pid 是: %dn", pid);
// 创建 socket1
int sockfd1, sockfd2;
struct sockaddr_in myaddr1, myaddr2;
sockfd1 = socket(AF_INET, SOCK_STREAM, 0);
sockfd2 = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd1 < 0 || sockfd2 < 0) {
perror("creat sockfd1 failed");
return -1;
}
int on = 1;
if (
setsockopt(sockfd1, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0 ||
setsockopt(sockfd2, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0
) {
perror("setsockopt");
return -1;
}
myaddr1.sin_family = AF_INET;
myaddr1.sin_port = htons(PORT1);
myaddr1.sin_addr.s_addr = INADDR_ANY;
myaddr2.sin_family = AF_INET;
myaddr2.sin_port = htons(PORT2);
myaddr2.sin_addr.s_addr = INADDR_ANY;
if (
bind(sockfd1, (const struct sockaddr *)&myaddr1, sizeof(myaddr1)) < 0 ||
bind(sockfd2, (const struct sockaddr *)&myaddr2, sizeof(myaddr2)) < 0
) {
perror("bind failed");
return -1;
}
if (
listen(sockfd1, 10) < 0 ||
listen(sockfd2, 10) < 0
) {
perror("listen failed");
return -1;
}
int efd = epoll_create(2);
int efd_internal = epoll_create(1);
if (efd < 0 || efd_internal < 0) {
printf("efd errnon");
return -1;
}
int cn_fd1 = accept(sockfd1, NULL, NULL);
int cn_fd2 = accept(sockfd2, NULL, NULL);
set_fcntl(cn_fd1);
set_fcntl(cn_fd2);
if (cn_fd1 < 0 || cn_fd2 < 0) {
printf("accept fd errnorn");
return -1;
}
struct epoll_event evt1 = {
.events = EPOLLIN,
.data.fd = cn_fd1,
};
struct epoll_event evt2 = {
.events = EPOLLIN,
.data.fd = cn_fd2,
};
struct epoll_event evt_internal = {
.events = EPOLLIN,
.data.fd = efd_internal,
};
// 把 fd1 添加到外部的 epoll 中
if (epoll_ctl(efd, EPOLL_CTL_ADD, cn_fd1, &evt1) < 0) {
printf("put listen_fd epoll errnon");
return -1;
}
// 把 fd2 添加到内部的 epoll
if (epoll_ctl(efd_internal, EPOLL_CTL_ADD, cn_fd2, &evt2) < 0) {
printf("put listen_fd epoll errnon");
return -1;
}
// 把内部的 epoll 添加到外部的 epoll 中
if (epoll_ctl(efd, EPOLL_CTL_ADD, efd_internal, &evt_internal) < 0) {
printf("put listen_fd epoll errnon");
return -1;
}
char buf[1024] = {0};
struct epoll_event events[MAX];
while (1) {
int i = 0;
// 这里再 wait 时, 要么是 fd1 收到数据, 要么是内部的 epoll 的 fd2 收到数据
int num = epoll_wait(efd, events, MAX, ~0);
if (num < 0) {
printf("epoll_wait events start errnon");
return -1;
}
for (i = 0; i < num; i++) {
if (events[i].events & EPOLLIN) {
if (events[i].data.fd == cn_fd1) {
printf("外部的 fd1 接收到数据n");
int len = read(cn_fd1, buf, sizeof(buf));
if (len <= 0) {
struct epoll_event ac_evt1;
if (epoll_ctl(efd, EPOLL_CTL_DEL, cn_fd1, &ac_evt1) < 0) {
printf("put accept_fd epoll errnon");
return -1;
}
close(cn_fd1);
} else {
printf("%sn", buf);
write(events[i].data.fd, buf, len);
}
} else if (events[i].data.fd == cn_fd2) {
printf("外部的 fd2 接收到数据n");
int len = read(cn_fd1, buf, sizeof(buf));
if (len <= 0) {
struct epoll_event ac_evt1;
if (epoll_ctl(efd, EPOLL_CTL_DEL, cn_fd1, &ac_evt1) < 0) {
printf("put accept_fd epoll errnon");
return -1;
}
close(cn_fd1);
} else {
printf("%sn", buf);
write(events[i].data.fd, buf, len);
}
} else if (events[i].data.fd == efd_internal) {
printf("内部的 epoll 接收到数据n");
char buf_internal[1024] = {0};
struct epoll_event events_internal[MAX];
int num_internal = epoll_wait(efd_internal, events_internal, MAX, ~0);
if (num_internal < 0) {
printf("internal epoll_wait events start errnon");
return -1;
}
int i_internal = 0;
for (i_internal = 0; i_internal < num_internal; i_internal++) {
if (events_internal[i].events & EPOLLIN) {
if (events_internal[i].data.fd == cn_fd2) {
printf("内部的 fd2 接收到数据n");
int len = read(cn_fd2, buf_internal, sizeof(buf_internal));
if (len <= 0) {
struct epoll_event ac_evt2;
if (epoll_ctl(efd_internal, EPOLL_CTL_DEL, cn_fd2, &ac_evt2) < 0) {
printf("put internal accept_fd epoll errnon");
return -1;
}
close(cn_fd2);
} else {
printf("%sn", buf_internal);
write(cn_fd2, buf_internal, len);
}
}
}
}
}
}
}
}
}
到这里,我们就把 epoll 的实现原理,以及为啥性能好,还有一个不常见的小场景都介绍了一下。