一、Libevent简介
Libevent是开源社区一款高性能的I/O框架库,其具有如下特点:
1、跨平台支持。Libevent支持Linux、UNIX和Windows。
2、统一事件源。libevent对i/o事件、信号和定时事件提供统一的处理。
3、线程安全。libevent使用libevent_pthreads库来提供线程安全支持。
4、基于reactor模式的实现。
5、轻量级,专注于网络,没有ACE那么臃肿庞大
6、可以注册事件优先级
二、Reactor 模式
2.1 Reactor简介
首先来回想一下普通函数调用的机制:程序调用某函数->函数执行,程序等待->函数将结果和控制权返回给程->程序继续处理。
Reactor 释义“反应堆”,是一种事件驱动机制。和普通函数调用的不同之处在于:应用程序不是主动的调用某个 API 完成处理,而是恰恰相反,Reactor 逆置了事件处理流程,应用程序需要提供相应的接口并注册到 Reactor 上,如果相应的事件发生,Reactor 将主动调用应用程序注册的接口,这些接口又称为“回调函数”。使用 Libevent 也是向 Libevent 框架注册相应的事件和回调函数;当这些事件发生时,Libevent 会调用这些回调函数处理相应的事件(I/O 读写、定时和信号)。
2.2 Reactor 模式的优点
Reactor 模式是编写高性能网络服务器的必备技术之一,它具有如下的优点:
1)响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的;
2)编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/ 进程的切换开销;
3)可扩展性,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源;
4)可复用性,reactor 框架本身与具体事件处理逻辑无关,具有很高的复用性;
2.3 Reactor 模式框架
使用 Reactor 模型,必备的几个组件:事件源(描述符)、Reactor 框架、多路复用机制和事件处理程序,先来看看 Reactor 模型的整体框架,接下来再对每个组件做逐一说明。
1)事件源(handle)
由操作系统提供,用于识别每一个事件,如Socket描述符、文件描述符等。在Linux中,它用一个整数来表示。事件可以来自外部,如来自客户端的连接请求、数据等。事件也可以来自内部,如定时器事件。
2)event demultiplexer——事件多路分发机制(同步事件分离器)
由操作系统提供的 I/O 多路复用机制,比如 select 和 epoll。 程序首先将其关心的句柄(事件源)及其事件注册到 event demultiplexer 上; 当有事件到达时,event demultiplexer 会发出通知“在已经注册的句柄集中,一个或多 个句柄的事件已经就绪”; 程序收到通知后,就可以在非阻塞的情况下对事件进行处理了。 对应到 libevent 中,依然是 select、poll、epoll 等,但是 libevent 使用结构体 eventop 进行了 封装,以统一的接口来支持这些 I/O 多路复用机制,达到了对外隐藏底层系统机制的目的。
3)Reactor——反应器(管理器)
定义了一些接口,用于应用程序控制事件调度,以及应用程序注册、删除事件处理器和相关的描述符。它是事件处理器的调度核心。 Reactor管理器使用同步事件分离器来等待事件的发生。一旦事件发生,Reactor管理器先是分离每个事件,然后调度事件处理器,最后调用相关的模板函数来处理这个事件。
4) Event Handler——事件处理程序(事件处理器接口)
事件处理程序提供了一组接口,每个接口对应了一种类型的事件,供 Reactor 在相应的事件发生时调用,执行相应的事件处理。通常它会绑定一个有效的句柄。 对应到 libevent 中,就是 event 结构体。
5)具体的事件处理器
是事件处理器接口的实现。它实现了应用程序提供的某个服务。每个具体的事件处理器总和一个描述符相关。它使用描述符来识别事件、识别应用程序提供的服务。
2.4 Reactor 事件处理流程
三、libevent库的使用
3.1 使用步骤
1、调用event_init函数创建event_base对象。一个event_base相当于一个reactor实例。
2、创建具体的事件处理器,并设置它们所从属的reactor实例。evsignal_new和evtimer_new分别用于创建信号事件处理器和定时事件处理器,它们的统一入口是event_new函数,event_new函数成功时返回一个event类型的对象,也就是libevent的事件处理器
3、调用event_add函数,将事件处理器添加到注册事件队列中,并将该事件处理器对应的事件添加到事件多路分发器中。
4、调用event_base_dispatch函数来执行事件循环。
5、事件循环结束后,使用*_free系列函数来释放系统资源。
3.2 事件处理流程
当应用程序向 libevent 注册一个事件后,libevent 内部是怎么样进行处理的呢?
下面的图就给出了这一基本流程。
1)首先应用程序准备并初始化 event,设置好事件类型和回调函数;这对应于前面第步骤 2 和 3;
2)向 libevent 添加该事件 event。对于定时事件,libevent 使用一个小根堆管理,key 为超 时时间;对于 Signal 和 I/O 事件,libevent 将其放入到等待链表(wait list)中,这是一 个双向链表结构;
3)程序调用 event_base_dispatch()系列函数进入无限循环,等待事件,以 select()函数为例; 每次循环前 libevent 会检查定时事件的最小超时时间 tv,根据 tv 设置 select()的最大等待时间,以便于后面及时处理超时事件; 当 select()返回后,首先检查超时事件,然后检查 I/O 事件;
四、libevent 源代码文件组织
4.1 源代码组织结构
1)头文件
主要就是 event.h:事件宏定义、接口函数声明,主要结构体 event 的声明;
2)内部头文件
xxx-internal.h:内部数据结构和函数,对外不可见,以达到信息隐藏的目的;
3) libevent 框架
event.c: event 整体框架的代码实现;
4)对系统 I/O 多路复用机制的封装
epoll.c:对 epoll 的封装;
select.c:对 select 的封装;
devpoll.c:对 dev/poll 的封装;
kqueue.c:对 kqueue 的封装;
5)定时事件管理
min-heap.h:其实就是一个以时间作为 key 的小根堆结构;
6)信号管理
signal.c:对信号事件的处理;
7)辅助功能函数
evutil.h 和 evutil.c:一些辅助功能函数,包括创建 socket pair 和一些时间操作函数:加、减和比较等。
8)日志
log.h 和 log.c: log 日志函数
9)缓冲区管理
evbuffer.c 和 buffer.c: libevent 对缓冲区的封装;
10)基本数据结构
compatsys 下的两个源文件: queue.h 是 libevent 基本数据结构的实现,包括链表,双向链表,队列等; _libevent_time.h:一些用于时间操作的结构体定义、函数和宏定义;
11)实用网络库
http 和 evdns:是基于 libevent 实现的 http 服务器和异步 dns 查询库
五、libevent 的核心
5.1 ibevent 的核心---event
Libevent 是基于事件驱动(event-driven)的,从名字也可以看到 event 是整个库的核心。 event 就是 Reactor 框架中的事件处理程序组件;它提供了函数接口,供 Reactor 在事件发生时调用,以执行相应的事件处理,通常它会绑定一个有效的句柄。 首先给出 event 结构体的声明,它位于libevent-masterincludeevent2event_struct.h文件中:
struct event {
/**
* ev_callback,event的回调函数,被ev_base调用,执行事件处理程序,这是一个函数指针,原型为:
* void (*ev_callback)(int fd, short events, void *arg)
* 其中参数fd对应于ev_fd;events对应于ev_events;
*
* 具体定义在上面
* 以下为一些常用的宏定义
* #define ev_pri ev_evcallback.evcb_pri
* #define ev_flags ev_evcallback.evcb_flags
* #define ev_closure ev_evcallback.evcb_closure
* #define ev_callback ev_evcallback.evcb_cb_union.evcb_callback
* #define ev_arg ev_evcallback.evcb_arg
*/
struct event_callback ev_evcallback; //event的回调函数,被ev_base调用
/* for managing timeouts */
/**
* min_heap_idx和ev_timeout,如果是timeout事件,它们是event在小根堆中的索引和超时值,
* libevent使用小根堆来管理定时事件
* 用来管理超时事件
*/
union {
// 公用超时队列
TAILQ_ENTRY(event) ev_next_with_common_timeout;
// min_heap最小堆索引
int min_heap_idx;
} ev_timeout_pos;
evutil_socket_t ev_fd; //对于I/O事件,是绑定的文件描述符;对于signal事件,是绑定的信号;
short ev_events; //event关注的事件类型,它可以是以下3种类型:I/O事件、定时事件、信号、辅助选项(EV_PERSIST)
short ev_res; /* result passed to event callback 记录了当前激活事件的类型*/
struct event_base *ev_base; //该事件所属的反应堆实例,libevent句柄,每个事件都会保存一份句柄
/**
* 用共用体来同时表现IO事件和信号
* 以下为一些方便调用的宏定义
* mutually exclusive
* #define ev_signal_next ev_.ev_signal.ev_signal_next
* #define ev_io_next ev_.ev_io.ev_io_next
* #define ev_io_timeout ev_.ev_io.ev_timeout
*
* used only by signals
* #define ev_ncalls ev_.ev_signal.ev_ncalls
* #define ev_pncalls ev_.ev_signal.ev_pncalls
*/
union {
/* used for io events */
struct {
// 下一个io事件
LIST_ENTRY (event) ev_io_next; //使用双向链表保存所有注册的I/O和Signal事件
// 事件超时时间(既可以是相对时间,也可以是绝对时间)
struct timeval ev_timeout;
} ev_io;
/* used by signal events */
struct {
// 下一个信号
LIST_ENTRY (event) ev_signal_next;
short ev_ncalls; //事件就绪执行时,调用ev_callback的次数,通常为1;
/* Allows deletes in callback */
short *ev_pncalls; //指针,通常指向ev_ncalls或者为NULL
} ev_signal;
} ev_;
// 保存事件的超时时间
struct timeval ev_timeout;
};
下面详细解释一下结构体中各字段的含义,注意部分变量有可能位于event_callback结构体中。
1)ev_events:event关注的事件类型,它可以是以下3种类型:
- I/O事件:EV_WRITE和EV_READ
- 定时事件:EV_TIMEOUT
- 信号事件: EV_SIGNAL
- 辅助选项:EV_PERSIST,表明是一个永久事件
Libevent中的定义为:
/** Indicates that a timeout has occurred. It's not necessary to pass
* this flag to event_for new()/event_assign() to get a timeout. */
// 定时事件
#define EV_TIMEOUT 0x01
/** Wait for a socket or FD to become readable */
// 读事件
#define EV_READ 0x02
/** Wait for a socket or FD to become writeable */
// 写事件
#define EV_WRITE 0x04
/** Wait for a POSIX signal to be raised*/
// 信号事件
#define EV_SIGNAL 0x08
/**
* Persistent event: won't get removed automatically when activated.
*
* When a persistent event with a timeout becomes activated, its timeout
* is reset to 0.
*/
// 永久事件,激活执行后会重新加到队列中等待下一次激活,否则激活执行后会自动移除
#define EV_PERSIST 0x10
/** Select edge-triggered behavior, if supported by the backend. */
// 边沿触发,一般需要后台方法支持
#define EV_ET 0x20
/**
* If this option is provided, then event_del() will not block in one thread
* while waiting for the event callback to complete in another thread.
*
* To use this option safely, you may need to use event_finalize() or
* event_free_finalize() in order to safely tear down an event in a
* multithreaded application. See those functions for more information.
*
* THIS IS AN EXPERIMENTAL API. IT MIGHT CHANGE BEFORE THE LIBEVENT 2.1 SERIES
* BECOMES STABLE.
**/
// 终止事件,如果设置这个选项,则event_del不会阻塞,需要使用event_finalize或者
#define EV_FINALIZE 0x40
/**
* Detects connection close events. You can use this to detect when a
* connection has been closed, without having to read all the pending data
* from a connection.
*
* Not all backends support EV_CLOSED. To detect or require it, use the
* feature flag EV_FEATURE_EARLY_CLOSE.
**/
// 检查事件连接是否关闭;可以使用这个选项来检测链接是否关闭,而不需要读取此链接所有未决数据;
#define EV_CLOSED 0x80
可以看出事件类型可以使用“|”运算符进行组合,需要说明的是,信号和I/O事件不能同时设置; 还可以看出libevent使用event结构体将这3种事件的处理统一起来;
2)ev_next,ev_active_next 和 ev_signal_next 都是双向链表节点指针;它们是 libevent 对不同事件类型和在不同的时期,对事件的管理时使用到的字段。 libevent 使用双向链表保存所有注册的 I/O 和 Signal 事件,ev_next 就是该 I/O 事件在链表中的位置;称此链表为“已注册事件链表”; 同样 ev_signal_next 就是 signal 事件在 signal 事件链表中的位置; ev_active_next:libevent 将所有的激活事件放入到链表 active list 中,然后遍历 active list 执行调度,ev_active_next 就指明了 event 在 active list 中的位置;
3)min_heap_idx 和 ev_timeout,如果是 timeout 事件,它们是 event 在小根堆中的索引和超时值,libevent 使用小根堆来管理定时事件。
4)ev_base 该事件所属的反应堆实例,这是一个 event_base 结构体。
5)ev_fd,对于 I/O 事件,是绑定的文件描述符;对于 signal 事件,是绑定的信号;
6)ev_callback,是一个结构体,里面包含有事件的回调函数(54-57行),这个回调函数被 ev_base 调用,执行事件处理程序,原型为:
struct event_callback {
//下一个回调事件
TAILQ_ENTRY(event_callback) evcb_active_next;
/**
*
* 回调事件的状态标识,具体为:
* #define EVLIST_TIMEOUT 0x01 // event在time堆中,min_heap
* #define EVLIST_INSERTED 0x02 // event在已注册事件链表中,event_base的queue中
* #define EVLIST_SIGNAL 0x04 // 未见使用
* #define EVLIST_ACTIVE 0x08 // event在激活链表中,event_base的active_queue中
* #define EVLIST_INTERNAL 0x10 // 内部使用标记
* #define EVLIST_ACTIVE_LATER 0x20 event在下一次激活链表中
* #define EVLIST_INIT 0x80 // event已被初始化
* #define EVLIST_ALL 0xff // 主要用于判断事件状态的合法性
*/
short evcb_flags;
// 回调函数的优先级,越小优先级越高
ev_uint8_t evcb_pri; /* smaller numbers are higher priority */
// 执行不同的回调函数
// /** @name Event closure codes
// Possible values for evcb_closure in struct event_callback
// @{
// */
// /** A regular event. Uses the evcb_callback callback 事件关闭时的回调函数模式类型 */
// // 常规事件,使用evcb_callback回调
// #define EV_CLOSURE_EVENT 0
// /** A signal event. Uses the evcb_callback callback */
// // 信号事件;使用evcb_callback回调
// #define EV_CLOSURE_EVENT_SIGNAL 1
// /** A persistent non-signal event. Uses the evcb_callback callback */
// // 永久性非信号事件;使用evcb_callback回调
// #define EV_CLOSURE_EVENT_PERSIST 2
// /** A simple callback. Uses the evcb_selfcb callback. */
// // 简单回调,使用evcb_selfcb回调
// #define EV_CLOSURE_CB_SELF 3
// /** A finalizing callback. Uses the evcb_cbfinalize callback. */
// // 结束的回调,使用evcb_cbfinalize回调
// #define EV_CLOSURE_CB_FINALIZE 4
// /** A finalizing event. Uses the evcb_evfinalize callback. */
// // 结束事件回调,使用evcb_evfinalize回调
// #define EV_CLOSURE_EVENT_FINALIZE 5
// /** A finalizing event that should get freed after. Uses the evcb_evfinalize
// * callback. */
// // 结束事件之后应该释放,使用evcb_evfinalize回调
// #define EV_CLOSURE_EVENT_FINALIZE_FREE 6
// /** @} */
ev_uint8_t evcb_closure;
/* allows us to adopt for different types of events */
// 允许我们自动适配不同类型的回调事件
union {
void (*evcb_callback)(evutil_socket_t, short, void *);
void (*evcb_selfcb)(struct event_callback *, void *);
void (*evcb_evfinalize)(struct event *, void *);
void (*evcb_cbfinalize)(struct event_callback *, void *);
} evcb_cb_union;
// 回调参数
void *evcb_arg;
};
7)ev_arg:void*,表明可以是任意类型的数据,在设置 event 时指定;
8)eb_flags:libevent 用于标记 event 信息的字段,表明其当前的状态,可能的值有:
//事件状态标志
// 事件在time min_heap堆中
#define EVLIST_TIMEOUT 0x01
// 事件在已注册事件链表中
#define EVLIST_INSERTED 0x02
// 目前未使用
#define EVLIST_SIGNAL 0x04
// 事件在激活链表中
#define EVLIST_ACTIVE 0x08
// 内部使用标记
#define EVLIST_INTERNAL 0x10
// 事件在下一次激活链表中
#define EVLIST_ACTIVE_LATER 0x20
// 事件已经终止
#define EVLIST_FINALIZING 0x40
// 事件初始化完成,但是哪儿都不在
#define EVLIST_INIT 0x80
// 包含所有事件状态,用于判断合法性的
#define EVLIST_ALL 0xff
9)ev_ncalls:事件就绪执行时,调用 ev_callback 的次数,通常为 1;
10)ev_pncalls:指针,通常指向 ev_ncalls 或者为 NULL;
11)ev_res:记录了当前激活事件的类型
5.2 libevent 对 event 的管理
从event 结构体中的 3 个链表节点指针和一个堆索引出发,大体上也能窥出 libevent 对 event 的管理方法了,可以参见下面的示意图。 每次当有事件 event 转变为就绪状态时,libevent 就会把它移入到 active event list[priority] 中,其中 priority 是 event 的优先级; 接着 libevent 会根据自己的调度策略选择就绪事件,调用其 cb_callback()函数执行事件处理;并根据就绪的句柄和事件类型填充 cb_callback 函数的参数。
5.3 事件设置的接口函数
要向 libevent 添加一个事件,需要首先设置 event 对象,这通过调用 libevent 提供的函数有:event_set(), event_base_set(), event_priority_set()来完成;下面分别进行讲解。
void event_set(struct event *ev, int fd, short events, void (*callback)(int, short, void *), void *arg)
1.设置事件 ev 绑定的文件描述符或者信号,对于定时事件,设为-1 即可;
2.设置事件类型,比如 EV_READ|EV_PERSIST, EV_WRITE, EV_SIGNAL 等;
3.设置事件的回调函数以及参数 arg;
4.初始化其它字段,比如缺省的 event_base 和优先级;
int event_base_set(struct event_base *base, struct event *ev)
设置 event ev 将要注册到的 event_base;
libevent 有一个全局 event_base 指针 current_base,默认情况下事件 ev 将被注册到 current_base 上,使用该函数可以指定不同的 event_base;
如果一个进程中存在多个 libevent 实例,则必须要调用该函数为 event 设置不同的 event_base;
int event_priority_set(struct event *ev, int pri)
设置event ev的优先级,没什么可说的,注意的一点就是:当ev正处于就绪状态时,不能设置,返回-1。
六、初见事件处理框架
前面已经对 libevent 的事件处理框架和 event 结构体做了描述,现在是时候剖析 libevent 对事件的详细处理流程了,本节将分析 libevent 的事件处理框架 event_base 和 libevent 注册、 删除事件的具体流程,可结合前一节 libevent 对 event 的管理。
6.1 事件处理框架-event_base
回想 Reactor 模式的几个基本组件,本节讲解的部分对应于 Reactor 框架组件。在 libevent 中,这就表现为 event_base 结构体,结构体声明如下,它位于libevent-masterevent-internal.h 文件中:
struct event_base {
/** Function pointers and other data to describe this event_base's
* backend. */
/**
* 实际使用后台方法的句柄,实际上指向的是静态全局数组变量,从静态全局变量eventops中选择
*/
const struct eventop *evsel;
/** Pointer to backend-specific data. */
/**
* 指向后台特定的数据,是由evsel->init返回的句柄
* 实际上是对实际后台方法所需数据的封装,void出于兼容性考虑
*/
void *evbase;
/** List of changes to tell backend about at next dispatch. Only used
* by the O(1) backends. */
// 告诉后台方法下一次调度的变化列表
struct event_changelist changelist;
/** Function pointers used to describe the backend that this event_base
* uses for signals */
// 用于描述当前event_base用于信号的后台方法
const struct eventop *evsigsel;
/** Data to implement the common signal handler code. */
// 用于实现公用信号句柄的代码
struct evsig_info sig;
/** Number of virtual events */
// 虚拟事件的数量
int virtual_event_count;
/** Maximum number of virtual events active */
// 虚拟事件的最大数量
int virtual_event_count_max;
/** Number of total events added to this event_base */
// 添加到event_base上事件总数
int event_count;
/** Maximum number of total events added to this event_base */
// 添加到event_base上的最大个数
int event_count_max;
/** Number of total events active in this event_base */
// 当前event_base中活跃事件的个数
int event_count_active;
/** Maximum number of total events active in this event_base */
// 当前event_base中活跃事件的最大个数
int event_count_active_max;
/** Set if we should terminate the loop once we're done processing
* events. */
// 一旦我们完成处理事件了,如果我们应该终止loop,可以设置这个
int event_gotterm;
/** Set if we should terminate the loop immediately */
// 如果需要中止loop,可以设置这个变量
int event_break;
/** Set if we should start a new instance of the loop immediately. */
// 如果启动新实例的loop,可以设置这个
int event_continue;
/** The currently running priority of events */
// 当前运行事件的优先级
int event_running_priority;
/** Set if we're running the event_base_loop function, to prevent
* reentrant invocation. */
// 防止event_base_loop重入的
int running_loop;
/** Set to the number of deferred_cbs we've made 'active' in the
* loop. This is a hack to prevent starvation; it would be smarter
* to just use event_config_set_max_dispatch_interval's max_callbacks
* feature */
/**
* 设置已经在loop中设置为’active’的deferred_cbs的个数,这是为了避免
* 饥饿的hack方法;只需要使用event_config_set_max_dispatch_interval’s的
* max_callbacks特征就可以变的更智能
*/
int n_deferreds_queued;
/* Active event management. // 活跃事件管理*/
/** An array of nactivequeues queues for active event_callbacks (ones
* that have triggered, and whose callbacks need to be called). Low
* priority numbers are more important, and stall higher ones.
* 存储激活事件的event_callbacks的队列,这些event_callbacks都需要调用;
* 数字越小优先级越高
*/
struct evcallback_list *activequeues;
/** The length of the activequeues array 活跃队列的长度*/
int nactivequeues;
/** A list of event_callbacks that should become active the next time
* we process events, but not this time. */
// 下一次会变成激活状态的回调函数的列表,但是当前这次不会调用
struct evcallback_list active_later_queue;
/* common timeout logic // 公用超时逻辑*/
/** An array of common_timeout_list* for all of the common timeout
* values we know.
* 公用超时事件列表,这是二级指针,每个元素都是具有同样超时
* 时间事件的列表,
*/
struct common_timeout_list **common_timeout_queues;
/** The number of entries used in common_timeout_queues */
// 公用超时队列中的项目个数
int n_common_timeouts;
/** The total size of common_timeout_queues. */
// 公用超时队列的总个数
int n_common_timeouts_allocated;
/** Mapping from file descriptors to enabled (added) events */
// 文件描述符和事件之间的映射表
struct event_io_map io;
/** Mapping from signal numbers to enabled (added) events. */
// 信号数字和事件之间映射表
struct event_signal_map sigmap;
/** Priority queue of events with timeouts. */
// 事件超时的优先级队列,使用最小堆实现
struct min_heap timeheap;
/** Stored timeval: used to avoid calling gettimeofday/clock_gettime
* too often. */
// 存储时间:用来避免频繁调用gettimeofday/clock_gettime
struct timeval tv_cache;
// monotonic格式的时间
struct evutil_monotonic_timer monotonic_timer;
/** Difference between internal time (maybe from clock_gettime) and
* gettimeofday. */
// 内部时间(可以从clock_gettime获取)和gettimeofday之间的差异
struct timeval tv_clock_diff;
/** Second in which we last updated tv_clock_diff, in monotonic time. */
// 更新内部时间的间隔秒数
time_t last_updated_clock_diff;
#ifndef EVENT__DISABLE_THREAD_SUPPORT
/* threading support */
/** The thread currently running the event_loop for this base */
unsigned long th_owner_id;
/** A lock to prevent conflicting accesses to this event_base */
void *th_base_lock;
/** A condition that gets signalled when we're done processing an
* event with waiters on it. */
void *current_event_cond;
/** Number of threads blocking on current_event_cond. */
int current_event_waiters;
#endif
/** The event whose callback is executing right now */
// 当前执行的回调函数
struct event_callback *current_event;
#ifdef _WIN32
/** IOCP support structure, if IOCP is enabled. */
struct event_iocp_port *iocp;
#endif
/** Flags that this base was configured with */
// event_base配置的特征值
// 多线程调用是不安全的,单线程非阻塞模式
// EVENT_BASE_FLAG_NOLOCK = 0x01,
// 忽略检查EVENT_*等环境变量
// EVENT_BASE_FLAG_IGNORE_ENV = 0x02,
// 只用于windows
// EVENT_BASE_FLAG_STARTUP_IOCP = 0x04,
// 不使用缓存的时间,每次回调都会获取系统时间
// EVENT_BASE_FLAG_NO_CACHE_TIME = 0x08,
// 如果使用epoll方法,则使用epoll内部的changelist
// EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST = 0x10,
// 使用更精确的时间,但是可能性能会降低
// EVENT_BASE_FLAG_PRECISE_TIMER = 0x20
enum event_base_config_flag flags;
// 最大调度时间间隔
struct timeval max_dispatch_time;
// 最大调度的回调函数个数
int max_dispatch_callbacks;
// 优先级设置之后,对于活跃队列中子队列个数的限制
// 但是当子队列个数超过这个限制之后,会以实际的回调函数个数为准
int limit_callbacks_after_prio;
/* Notify main thread to wake up break, etc. */
/** True if the base already has a pending notify, and we don't need
* to add any more. */
//如果为1表示当前可以唤醒主线程,否则不能唤醒主线程
int is_notify_pending;
/** A socketpair used by some th_notify functions to wake up the main
* thread. */
// 一端读、一端写,用来触发唤醒事件
evutil_socket_t th_notify_fd[2];
/** An event used by some th_notify functions to wake up the main
* thread. */
// 唤醒event_base的event,被添加到监听集合中的对象
struct event th_notify;
/** A function used to wake up the main thread from another thread. */
//执行唤醒操作的函数(不是唤醒event的回调函数)
int (*th_notify_fn)(struct event_base *base);
/** Saved seed for weak random number generator. Some backends use
* this to produce fairness among sockets. Protected by th_base_lock. */
// 保存弱随机数产生器的种子。某些后台方法会使用这个种子来公平的选择sockets。
struct evutil_weakrand_state weakrand_seed;
/** List of event_onces that have not yet fired. */
LIST_HEAD(once_event_list, event_once) once_events;
};
下面详细解释一下结构体中部分字段的含义。
1)evsel 和 evbase 这两个字段的设置可能会让人有些迷惑,这里你可以把 evsel 和 evbase 看作是类和静态函数的关系,比如添加事件时的调用行为:evsel->add(evbase, ev),实际执 行操作的是 evbase;这相当于 class::add(instance, ev),instance 就是 class 的一个对象实例。 evsel指向了全局变量static const struct eventop *eventops[]中的一个;libevent将系统提供的I/O demultiplex机制统一封装成了eventop结构;因此 eventops[]包含了select、poll、kequeue和epoll等等其中的若干个全局实例对象。 evbase实际上是一个eventop实例对象; 先来看看eventop结构体,它的成员是一系列的函数指针, 在event-internal.h文件中:
/** Structure to define the backend of a given event_base. */
struct eventop {
/** The name of this backend. 后台方法名字,即epoll,select,poll等*/
const char *name;
/** Function to set up an event_base to use this backend. It should
* create a new structure holding whatever information is needed to
* run the backend, and return it. The returned pointer will get
* stored by event_init into the event_base.evbase field. On failure,
* this function should return NULL. */
/**
* 配置libevent句柄event_base使用当前后台方法;他应该创建新的数据结构,
* 隐藏了后台方法运行所需的信息,然后返回这些信息的结构体,为了支持多种
* 结构体,因此返回void*;返回的指针将保存在event_base.evbase中;如果失败,
* 将返回NULL
*/
void *(*init)(struct event_base *);
/** Enable reading/writing on a given fd or signal. 'events' will be
* the events that we're trying to enable: one or more of EV_READ,
* EV_WRITE, EV_SIGNAL, and EV_ET. 'old' will be those events that
* were enabled on this fd previously. 'fdinfo' will be a structure
* associated with the fd by the evmap; its size is defined by the
* fdinfo field below. It will be set to 0 the first time the fd is
* added. The function should return 0 on success and -1 on error.
*/
/**
* 使给定的文件描述符或者信号变得可读或者可写。’events’将是我们尝试添加的
* 事件类型:一个或者更多的EV_READ,EV_WRITE,EV_SIGNAL,EV_ET。’old’是这些事件
* 先前的事件类型;’fdinfo’将是fd在evmap中的辅助结构体信息,它的大小由下面的
* fdinfo_len给出。fd第一次添加时将设置为0.成功则返回0,失败则返回-1
*/
int (*add)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);
/** As "add", except 'events' contains the events we mean to disable. */
// 删除事件
int (*del)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);
/** Function to implement the core of an event loop. It must see which
added events are ready, and cause event_active to be called for each
active event (usually via event_io_active or such). It should
return 0 on success and -1 on error.
*/
/**
* event_loop实现的核心代码。他必须察觉哪些添加的事件已经准备好,然后触发每个
* 活跃事件都被调用(通常是通过event_io_active或者类似这样)。成功返回0,失败则-1
*/
int (*dispatch)(struct event_base *, struct timeval *);
/** Function to clean up and free our data from the event_base. */
// 清除event_base并释放数据
void (*dealloc)(struct event_base *);
/** Flag: set if we need to reinitialize the event base after we fork.
*/
// 在执行fork之后是否需要重新初始化的标识位
int need_reinit;
/** Bit-array of supported event_method_features that this backend can
* provide. */
// 后台方法可以提供的特征
// enum event_method_feature {
// 边沿触发
// EV_FEATURE_ET = 0x01,
// 要求事后台方法在调度很多事件时大约为O(1)操作,select和poll无法提供这种特征,
// 这两种方法具有N个事件时,可以提供O(N)操作
// EV_FEATURE_O1 = 0x02,
// 后台方法可以处理各种文件描述符,而不仅仅是sockets
// EV_FEATURE_FDS = 0x04,
/** Require an event method that allows you to use EV_CLOSED to detect
* connection close without the necessity of reading all the pending data.
*
* Methods that do support EV_CLOSED may not be able to provide support on
* all kernel versions.
**/
// 要求后台方法允许使用EV_CLOSED特征检测链接是否中断,而不需要读取
// 所有未决数据;但是不是所有内核都能提供这种特征
// EV_FEATURE_EARLY_CLOSE = 0x08
// };
enum event_method_feature features;
/** Length of the extra information we should record for each fd that
has one or more active events. This information is recorded
as part of the evmap entry for each fd, and passed as an argument
to the add and del functions above.
*/
/**
* 应该为每个文件描述符保留的额外信息长度,额外信息可能包括一个或者多个
* 活跃事件。这个信息是存储在每个文件描述符的evmap中,然后通过参数传递
* 到上面的add和del函数中。
*/
size_t fdinfo_len;
};
也就是说,在 libevent 中,每种 I/O demultiplex 机制的实现都必须提供这五个函数接口, 来完成自身的初始化、销毁释放;对事件的注册、注销和分发。 比如对于 epoll,libevent 实现了 5 个对应的接口函数,并在初始化时并将 eventop 的 5 个函数指针指向这 5 个函数,那么程序就可以使用 epoll 作为 I/O demultiplex 机制了,这个在后面会再次提到。
2)activequeues 是一个一级指针,前面讲过 libevent 支持事件优先级,因此你可以把它看作是一维数组,其中的元素 activequeues[priority]是一个链表,链表的每个节点指向一个优先级为 priority 的就绪事件 event,实际上链表当中存放的是事件的回调函数。
3)io,管理IO事件的结构体变量。
4)sigmap 是由来管理信号的结构体,将在后面信号处理时专门讲解;
5)timeheap 是管理定时事件的小根堆,将在后面定时事件处理时专门讲解;
6)tv_cache 是 libevent 用于时间管理的变量,存储时间:用来避免频繁调用gettimeofday/clock_gettime;
6.2 创建和初始化 event_base
创建一个 event_base 对象也既是创建了一个新的 libevent 实例,程序需要通过调用 event_init()(内部调用 event_base_new 函数执行具体操作)函数来创建,该函数同时还对新生成的 libevent 实例进行了初始化。 该函数首先为 event_base 实例申请空间,然后初始化定时事件使用的mini-heap,选择并初始化合适的系统 I/O 的 demultiplexer 机制,初始化各事件链表; 函数还检测了系统的时间设置,为后面的时间管理打下基础。
6.3 接口函数
前面提到 Reactor 框架的作用就是提供事件的注册、注销接口;根据系统提供的事件多路分发机制执行事件循环,当有事件进入“就绪”状态时,调用注册事件的回调函数来处理事件。 Libevent 中对应的接口函数主要就是:
int event_add(struct event *ev, const struct timeval *timeout);
int event_del(struct event *ev);
int event_base_loop(struct event_base *base, int loops);
void event_active(struct event *event, int res, short events);
void event_process_active(struct event_base *base);
6.3.1 注册事件
函数原型:
/**
* 事件注册-event_add
* 1、将事件添加到等待事件中去,需要注意的是,event_add在event_new或者event_assign之后执行,
* 即添加的事件必须是经过基本初始化过后的事件;
* 2、此处添加的事件包括IO事件、信号事件、定时事件,根据事件申请时设置的事件类型决定添加的流程;
* 3、超时控制包括两种方式:
* (1)最小堆:时间超时时间存储在最小堆,每次执行超时任务都从最小堆堆顶取任务执行
* (2)最小堆+公用超时队列:相同超时的任务存储在同一个超时队列,每一个超时队列的队首事件存储在最小堆,
* 每次执行超时任务时都从最小堆堆顶取任务执行,然后遍历执行该任务所在公用超时队列中的所有超时任务。
*/
int event_add(struct event *ev, const struct timeval *tv)
参数:ev:指向要注册的事件;
tv:超时时间;
函数将 ev 注册到 ev->ev_base 上,事件类型由 ev->ev_events 指明,如果注册成功,ev 将被插入到已注册链表中;如果 tv 不是 NULL,则会同时注册定时事件,将 ev 添加到 timer 堆上; 如果其中有一步操作失败,那么函数保证没有事件会被注册,可以讲这相当于一个原子操作。
int event_add(struct event *ev, const struct timeval *tv)
{
int res;
if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
event_warnx("%s: event has no event_base set.", __func__);
return -1;
}
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);//加锁
// 实际时调用内部实现函数event_add_nolock实现的,下文会分析
res = event_add_nolock_(ev, tv, 0);
EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
return (res);
}
event_add_nolock()函数
/* Implementation function to add an event. Works just like event_add,
* except: 1) it requires that we have the lock. 2) if tv_is_absolute is set,
* we treat tv as an absolute time, not as an interval to add to the current
* time
* 此函数真正实现将事件添加到event_base的等待列表中。
* 真正的将信号事件注册在event_base上,也就是进行了信号的内部事件注册
*
* 添加事件的实现函数;就像event_add一样,异常:
* 1)它需要使用者加锁;2)如果tv_is_absolute设置了,则将tv作为绝对时间对待,而不是相对于当前添加时间的时间间隔
*/
int
event_add_nolock_(struct event *ev, const struct timeval *tv,
int tv_is_absolute)
{
struct event_base *base = ev->ev_base;
int res = 0;
int notify = 0;
EVENT_BASE_ASSERT_LOCKED(base);
event_debug_assert_is_setup_(ev);
event_debug((
"event_add: event: %p (fd "EV_SOCK_FMT"), %s%s%s%scall %p",
ev,
EV_SOCK_ARG(ev->ev_fd),
ev->ev_events & EV_READ ? "EV_READ " : " ",
ev->ev_events & EV_WRITE ? "EV_WRITE " : " ",
ev->ev_events & EV_CLOSED ? "EV_CLOSED " : " ",
tv ? "EV_TIMEOUT " : " ",
ev->ev_callback));
// 事件状态必须处于合法的某种事件状态,否则报错
EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL));
// 已经处于结束状态的事件再次添加会报错
if (ev->ev_flags & EVLIST_FINALIZING) {
/* XXXX debug */
return (-1);
}
/*
* prepare for timeout insertion further below, if we get a
* failure on any step, we should not change any state.
*/
/**
* 为超时插入做准备,如果超时控制不为空,且事件没有处于超时状态
* 首先为将要插入的超时事件准备插入节点,主要是为了防止后面出现这种情况:
* 事件状态改变已经完成,但是最小堆申请节点却失败;
* 因此,如果在任何一步出现错误,都不能改变事件状态,这是前提条件。
*/
if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
if (min_heap_reserve_(&base->timeheap,
1 + min_heap_size_(&base->timeheap)) == -1)
return (-1); /* ENOMEM == errno */
}
/* If the main thread is currently executing a signal event's
* callback, and we are not the main thread, then we want to wait
* until the callback is done before we mess with the event, or else
* we can race on ev_ncalls and ev_pncalls below. */
/**
* 如果主线程当前正在执行信号事件的回调函数,同时又不在主线程,则
* 需要等待回调函数执行完毕才能继续添加事件,否则可能会在
* ev_ncalls和ev_pncalls上产生竞争。
*/
#ifndef EVENT__DISABLE_THREAD_SUPPORT
if (base->current_event == event_to_event_callback(ev) &&
(ev->ev_events & EV_SIGNAL)
&& !EVBASE_IN_THREAD(base)) {
++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
#endif
/**
* 如果事件类型是IO事件/信号事件,同时事件状态不是已经插入/激活/下一次激活状态,
* 则根据事件类型将事件添加到不同的映射表或者队列中
*/
if ((ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
// 如果事件是IO事件,则将事件插入到IO事件与文件描述符的映射表中
if (ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED))
res = evmap_io_add_(base, ev->ev_fd, ev);
// 如果事件是信号事件,则将事件插入信号与文件描述符的映射表中
else if (ev->ev_events & EV_SIGNAL)
res = evmap_signal_add_(base, (int)ev->ev_fd, ev);
// 如果上述添加行为正确,则将事件插入到event_base的事件列表中
if (res != -1)
event_queue_insert_inserted(base, ev);//其实就是这是事件的已注册标志
/**
* 如果上述添加行为正确,则设置通知主线程的标志,因为已经添加了新事件,
* 防止1)优先级高的事件被优先级低的事件倒挂,2)防止主线程忙等,会通知主线程有新事件
*/
if (res == 1) {
/* evmap says we need to notify the main thread. */
notify = 1;
res = 0;
}
}
/*
* we should change the timeout state only if the previous event
* addition succeeded.
* 只有当前面事件条件成功执行之后,才能改变超时状态
*/
if (res != -1 && tv != NULL) {
struct timeval now;
int common_timeout;
#ifdef USE_REINSERT_TIMEOUT
int was_common;
int old_timeout_idx;
#endif
/*
* for persistent timeout events, we remember the
* timeout value and re-add the event.
*
* If tv_is_absolute, this was already set.
*
* 对于持久化的定时事件,需要记住超时时间,并重新注册事件
* 如果tv_is_absolute设置,则事件超时时间就等于输入时间参数
*/
if (ev->ev_closure == EV_CLOSURE_EVENT_PERSIST && !tv_is_absolute)
ev->ev_io_timeout = *tv;
/**
* 如果没有使用USE_REINSERT_TIMEOUT,则当事件处于超时状态时,需要从队列中移除事件
* 因为同样的事件不能重新插入,所以当一个事件已经处于超时状态时,为防止执行,需要先移除后插入
*/
#ifndef USE_REINSERT_TIMEOUT
if (ev->ev_flags & EVLIST_TIMEOUT) {
event_queue_remove_timeout(base, ev);
}
#endif
/* Check if it is active due to a timeout. Rescheduling
* this timeout before the callback can be executed
* removes it from the active list.
* 检查事件当前状态是否已经激活,而且是超时事件的激活状态,
* 则在回调函数执行之前,需要重新调度这个超时事件,因此需要把它移出激活队列
* */
if ((ev->ev_flags & EVLIST_ACTIVE) &&
(ev->ev_res & EV_TIMEOUT)) {
if (ev->ev_events & EV_SIGNAL) {
/* See if we are just active executing
* this event in a loop
*/
if (ev->ev_ncalls && ev->ev_pncalls) {
/* Abort loop */
*ev->ev_pncalls = 0;
}
}
// 将此事件的回调函数从激活队列中移除
event_queue_remove_active(base, event_to_event_callback(ev));
}
// 获取base中的缓存时间
gettime(base, &now);
// 检查base是否使用了公用超时队列机制
common_timeout = is_common_timeout(tv, base);
#ifdef USE_REINSERT_TIMEOUT
was_common = is_common_timeout(&ev->ev_timeout, base);
old_timeout_idx = COMMON_TIMEOUT_IDX(&ev->ev_timeout);
#endif
/**
* 1)如果设置绝对超时时间,则设置时间超时时间为输入时间参数
* 2)如果使用的公用超时队列机制,则根据当前base中时间和输入超时时间间隔计算出时间超时时间,
* 并对超时时间进行公用超时掩码计算
* 3)如果是其他情况,则直接根据base中时间和输入超时时间间隔计算事件的超时时间
*/
if (tv_is_absolute) {
ev->ev_timeout = *tv;
} else if (common_timeout) {
struct timeval tmp = *tv;
tmp.tv_usec &= MICROSECONDS_MASK;
evutil_timeradd(&now, &tmp, &ev->ev_timeout);
ev->ev_timeout.tv_usec |=
(tv->tv_usec & ~MICROSECONDS_MASK);
} else {
evutil_timeradd(&now, tv, &ev->ev_timeout);
}
event_debug((
"event_add: event %p, timeout in %d seconds %d useconds, call %p",
ev, (int)tv->tv_sec, (int)tv->tv_usec, ev->ev_callback));
// 将事件插入超时队列
#ifdef USE_REINSERT_TIMEOUT
// event_queue_reinsert_timeout会插入两个队列:一个是公用超时队列,一个超时队列
event_queue_reinsert_timeout(base, ev, was_common, common_timeout, old_timeout_idx);
#else
// 只会插入超时队列
event_queue_insert_timeout(base, ev);
#endif
/**
* 如果使用了公用超时队列机制,则需要根据当前事件的超时时间将当前事件插入具有相同超时时间的时间列表
*/
if (common_timeout) {
// 根据事件超时时间获取应该插入的公用超时队列,注意此处是从队尾插入
struct common_timeout_list *ctl =
get_common_timeout_list(base, &ev->ev_timeout);
/**
* 如果当前事件是公用超时队列的第一个事件,则因此需要将此超时事件插入最小堆
* 解释:公用超时队列机制:处于同一个公用超时队列中的所有事件具有相同的超时控制,因此只需要将公用超时队列
* 的第一个事件插入最小堆,当超时触发时,可以通过遍历公用超时队列获取同样的超时事件。
*/
if (ev == TAILQ_FIRST(&ctl->events)) {
common_timeout_schedule(ctl, &now, ev);
}
} else {
// 如果没有使用公用超时队列,则调整最小堆
struct event* top = NULL;
/* See if the earliest timeout is now earlier than it
* was before: if so, we will need to tell the main
* thread to wake up earlier than it would otherwise.
* We double check the timeout of the top element to
* handle time distortions due to system suspension.
* 查看当前事件是否位于最小堆根部,如果是,则需要通知主线程
* 否则,需要查看最小堆根部超时时间是否已经小于当前时间,即已经超时了,如果是,则需要通知主线程
*/
if (min_heap_elt_is_top_(ev))
notify = 1;
else if ((top = min_heap_top_(&base->timeheap)) != NULL &&
evutil_timercmp(&top->ev_timeout, &now, <))
notify = 1;
}
}
/**
* if we are not in the right thread, we need to wake up the loop
* 如果本线程不是执行event_loop的主线程,就通知主线程
* */
if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
event_debug_note_add_(ev);
return (res);
}
6.3.2 删除事件
函数原型为:
static int event_del_(struct event *ev, int blocking)
该函数将删除事件 ev,对于 I/O 事件,从 I/O 的 demultiplexer 上将事件注销;对于 Signal 事件,将从 Signal 事件链表中删除;对于定时事件,将从堆上删除; 同样删除事件的操作则不一定是原子的,比如删除时间事件之后,有可能从系统 I/O 机 制中注销会失败。
static int
event_del_(struct event *ev, int blocking)
{
int res;
struct event_base *base = ev->ev_base;
if (EVUTIL_FAILURE_CHECK(!base)) {
event_warnx("%s: event has no event_base set.", __func__);
return -1;
}
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
res = event_del_nolock_(ev, blocking);
EVBASE_RELEASE_LOCK(base, th_base_lock);
return (res);
}