电子说
本文主要介绍定时器作用,实现定时器数据结构选取,并详细介绍了跳表,红黑树,时间轮实现定时器的思路和方法。
”
定时器在各种场景都需要用到,比如游戏的Buff实现,Redis中的过期任务,Linux中的定时任务等等。顾名思义,定时器的主要用途是执行定时任务。
定时器数据结构要求:
有序链表:插入O(n)
,删除O(1)
,过期expire
执行O(1)
最小堆:插入O(logn)
,删除O(logn)
,过期expire
执行O(1)
红黑树:插入O(logn)
,删除O(logn)
,过期expire
执行O(logn)
哈希表+链表(时间轮):插入O(1)
,删除O(1)
,过期expire
平均执行O(1)
(最坏为O(n)
)
不同开源框架定时器实现方式不一,如,libuv
采用最小堆来实现,nginx
采用红黑树实现,linux
内核和skynet
采用时间轮算法实现等等。
作为定时器,需要封装以下4类接口给用户使用:
init_timer
add_timer
cancel_timer
expire_timer
其中执行到期任务有两种工作方式:
接下来将介绍分别用跳表、红黑树、时间轮来实现定时器。
跳表是一种动态的数据结构,采用空间换时间的思想,在有序链表基础上加入多级索引,通过索引进行二分快速查找,支持快速删除、插入和查找操作(平均时间复杂度为O(logN)
,最坏为O(N)
),效率可与平衡树媲美,实现比其简单。
下面通过一张图来简单说明跳表操作。跳表的最底层即为基本的有序链表,存储所有的数据,可理解为数据层;往上则为索引层,理想状态下,上一层为下一层节点数的一半。比如,要查找下图的数据为11
的节点,从begin''
出发,向右走,如果下一个节点大于11
则往下走,直到找到目标节点。可见,跳表要比原始链表少比较一些节点,但前提是需要花更多空间存储索引节点。
image-20210323182236910
O(logn)
学会吸取开源框架中优秀数据结构和代码思想,直接采用redis
中跳表结构的实现,取出所需部分,用于实现定时器。如下:
跳表节点与跳表结构
/*skiplist.h*/
#define ZSKIPLIST_MAXLEVEL 32
#define ZSKIPPLIST 0.25
typedef struct zskiplistNode zskiplistNode;
typedef void (*handler_pt) (zskiplistNode * node);
// 跳表节点
struct zskiplistNode {
unsigned long score; /*用于排序的值*/
handler_pt handler; /*处理函数*/
struct zskiplistLevel {
struct zskiplistNode **forward;
}level[];
};
// 跳表结构
typedef struct zskiplist {
struct zskiplistNode * header;
int length;
int level; /*跳表层数*/
}zskiplist;
具体接口实现细节请移步redis
源码。
/*skiplist.h*/
/*创建跳表,初始化*/
zskiplist *zslCreate(void);
/*删除跳,表释放资源*/
void zslFree(zskiplist *zsl);
/*插入节点*/
zskiplistNode *zslInsert(zskiplist *zsl, unsigned long score, handler_pt func);
/*删除头节点*/
void zsklDeleteHead(zskiplist *zsl);
/*删除任意节点*/
void zslDelete(zskiplist *zsl, zskplistNode *zn);
/*打印,调试*/
void zslPrint(zskiplist *zsl);
主要介绍四个接口实现:初始化定时器,添加定时任务,删除/取消定时任务,处理定时任务
// test_user.c 封装给用户使用的接口
static uint32_t
current_time() {
uint32_t t;
struct timespec ti;
clock_getttime(CLOCK_MONOTONIC, &ti);
t = (uint32_t)ti.tv_sec * 1000;
t += ti.tv_sec / 1000000;
}
zskiplist *init_timer() {
// 初始化定时器
return zslCreate();
}
zskiplistNode *add_timer(zskiplist *zsl, uint32_t msec, handler_pt func) {
// 添加定时任务
msec += current_time();
return zslInsert(zsl, msec, func);
}
void cancel_timer(zskiplist *zsl, zskiplistNode *zn) {
// 删除/取消定时任务
zslDelete(zsl, zn);
}
void expire_timer(zskiplist *zsl){
// 处理定时任务
zskiplistNode *x;
uint32_t now = current_time();
for (;;) {
x = zslMin(zsl); // 最近节点
if (!x) break;
if (x->score > now) break; // 时间未到
x->handler(x); // 执行相关定时任务
zslDeleteHead(zsl); // 执行完删除
}
}
红黑树是一种自平衡的二叉查找树,即,插入和删除操作如果破坏树的平衡时,需要重新调整达到平衡状态。因此,是一种比较难的数据结构。
弄懂红黑树如何调整树的平衡,保证满足这5条性质,是比较麻烦,需要耐心的去推导一遍,此处不展开。
AVL
树平衡要求太高,维护平衡操作过多,较复杂;红黑树只需维护一个黑高度,效率较高
红黑树查找,删除,添加时间复杂度为:O(log(n))
吸取开源框架中优秀数据结构和代码思想,选用nginx
中的红黑树结构
红黑树节点与红黑树
// rbtree.h 红黑树数据结构以及相关接口,具体接口实现同上
#ifndef _NGX_RBTREE_H_INCLUDE_
#define _NGX_RBTREE_H_INCLUDE_
typedef unsigned int ngx_rbtree_key_t;
typedef unsigned int ngx_uint_t;
typedef int ngx_rbtree_key_int_t;
// 红黑树节点
typedef struct ngx_rbtree_node_s ngx_rbtree_node_t;
struct ngx_rbtree_node_s {
ngx_rbtree_key_t key;
ngx_rbtree_node_t *left;
ngx_rbtree_node_t *right;
ngx_rbtree_node_t *parent;
u_char color; // 节点颜色
u_char data; // 节点数据
};
// 插入函数指针
typedef void (*ngx_rbtree_insert_pt) (ngx_rbtree_node_t *root,
ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
// 红黑树
typedef struct ngx_rbtree_s ngx_rbtree_t;
struct ngx_rbtree_s {
ngx_rbtree_node_t *root;
ngx_rbtree_node_t *sentinel;
ngx_rbtree_insert_pt insert;
};
// 红黑树初始化
#define ngx_rbtree_init(tree, s, i) \\
ngx_rbtree_sentinel_init(s); \\
(tree)->root = s; \\
(tree)->sentinel = s; \\
(tree)->insert = i;
// 插入操作
void ngx_rbtree_insert(ngx_rbtree_t *tree, ngx_rbtree_node_t *node);
// 删除操作
void ngx_rbtree_delete(ngx_rbtree_t *tree, ngx_rbtree_node_t *node);
// 插入value
void ngx_rbtree_insert_value(ngx_rbtree_node_t *root, ngx_rbtree_node_t *node,
ngx_rbtree_node_t *sentinel);
// 插入timer
void ngx_rbtree_insert_timer_value(ngx_rbtree_node_t *root,
ngx_rbtree_node_t *node,
ngx_rbtree_node_t *sentinel);
// 获取下一个节点
ngx_rbtree_node_t *ngx_rbtree_next(ngx_rbtree_t *tree, ngx_rbtree_node_t *node);
#define ngx_rbt_red(node) ((node)->color = 1)
#define ngx_rbt_black(node) ((node)->color = 0)
#define ngx_rbt_is_red(node) ((node)->color)
#define ngx_rbt_is_black(node) (!ngx_rbt_is_red(node))
#define ngx_rbt_copy_color(n1, n2) (n1->color = n2->color)
#define ngx_rbtree_sentinel_init(node) ngx_rbt_black(node)
// 找到最小值,一直往左走即可
static inline ngx_rbtree_node_t *
ngx_rbtree_min(ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel)
{
while (node->left != sentinel){
node = node->left;
}
return node;
}
// test_user.c 封装给用户使用的接口
ngx_rbtree_t timer;
static ngx_rbtree_node_t sentinel;
typedef struct timer_entry_s timer_entry_t;
typedef void (*timer_handler_pt)(timer_entry_t *ev);
struct timer_entry_s {
ngx_rbtree_node_t timer;
timer_handler_pt handler;
};
// 初始化
int init_timer() {
ngx_rbtree_init(&timer, &sentinel, ngx_rbtree_insert_timer_value);
return 0;
}
// 添加定时任务
void add_timer(timer_entry_t *te, uint32_t msec) {
msec += current_time();
te->timer.key = msec;
ngx_rbtree_insert(&timer, &te->timer);
}
// 取消定时
void cancel_timer(timer_entry_t *te) {
ngx_rbtree_delete(&timer, &te->timer);
}
// 执行到期任务
void expire_timer() {
timer_entry_t *te;
ngx_rbtree_node_t *sentinel, *root, *node;
sentinel = timer.sentinel;
uint32_t now = current_time();
for(;;){
root = timer.root;
if (root == sentinel) break;
if (node->key > now) break;
te = (timer_entry_t *) ((char *) node - offsetof(timer_entry_t, timer));
te->handler(te);
ngx_rbtree_delete(&timer, &te->timer);
free(te);
}
}
以上,为红黑树和跳表实现的定时器,多线程环境下加锁粒度比较大,高并发场景下效率不高,而时间轮适合高并发场景,如下。
可以用于高效的执行大量定时任务,如下为分层时间轮示意图:
timewheel
时间轮可参考时钟进行理解,秒针(Seconds wheel)转一圈,则分针(Minutes wheel)走一格,分针(Minutes wheel)转一圈,则时针(Hours wheel)走一格。随着,时间的流逝,任务不断从上层流下下一层,最终到达秒针轮上,当秒针走到时执行。
如上所示,时间轮大小为8
格,秒针1s
转动一格,其每一格所指向的链表保存着待执行任务。比如,如果当前指针指向1
,要添加一个3s
后执行的任务,由于1+3=4
,即在第4
格的链表中添加一个任务节点即可。如果要添加一个10s
后执行的任务,10+1=11
,超过了秒针轮范围,因此需要对8取模11 % 8 = 3
,即,会把这个任务放到分针轮上3
对应的链表上,之后再从分针轮把任务丢到秒针轮上进行处理。也即,**秒针轮(Seconds wheel)**即保存着最近将要执行的任务,随着时间的流逝,任务会慢慢的从上层流到秒针轮中进行执行。
优点:加锁粒度较小,只需要加一个格子即可,一个格子对应一串链表;适合高并发场景
缺点:不好删除
true
, 在函数回调中根据这个标记判断是否需要处理这里介绍两种定时器实现方案,一种是简单实现方案,另一种是skynet
较为复杂的实现。
功能场景:由心跳包进行超时连接检测,10s未收到则断开连接
一般做法:map
每秒轮询这个结构,检测所有连接是否超时,收到心跳包,记录时间戳
缺点:效率很差,每次需要检测所有连接,时间复杂度为O(n)
优化:分治大法,只需检测快过期的连接, 采用hash数组+链表形式,数组大小设置成16 :[0] + [1] + [2] + ... + [15]
,相同过期时间的放入一个数组,因此,每次只需检测最近过期的数组即可,不需要遍历所有。
以下为定时器节点,增加引用计数ref
, 只有当ref
为0时删除连接。
class CTimerNode {
public:
CTimerNode(int fd) : id(fd), ref(0) {}
void Offline() {this->ref = 0};
bool tryKill() {
if (this->ref == 0) return true;
DecRef();
if (this->ref == 0){
return true;
}
return false;
}
void IncRef() {this->ref++;}
protected:
void DecRef() {this->ref--;}
private:
int ref;
int id;
}
// 时间轮数组大小16, (x对16取余)==(x&1111) 落到0-15之间,即落到对应的数组
const int TW_SIZE = 16;
const in EXPIRE = 10; // 过期间隔
const int TW_MASK = TW_SIZE - 1; // 掩码, 用于对16取余
static size_t iReadTick = 0; // 滴答时钟
typedef list
// 添加定时
void AddTimeOut(TimerWheel &tw, CTimerNode *p) {
if (p) {
p->IncRef();
// 找到iRealTick对应数组的idx(槽位)
TimeList &le = tw[(iRealTick+EXPIRE) & TW_MASK];
le.push_back(p); // 把时间节点加入list中
}
}
// 延时调用
void AddTimeOutDelay(TimeWheel &tw, CTimerNode *p, size_t delay) {
if (p) {
p->IncRef();
TimeList &le = tw[(iRealTick + EXPIRE + delay) & TW_MASK];
le.push_back(p);
}
}
// 时间轮移动
void TimerShift(TimeWheel &tw) {
size_t tick = iRealTick;
iRealTick++;
TimeList &le = tw[tick & TW_MASK];
TimeListIter iter = le.begin();
for (; iter != le.end(); iter++) {
CTimerNode *p = *iter;
if (p && p->trySkill()){
delete p;
}
}
le.clear();
}
全部0条评论
快来发表一下你的评论吧 !