C++服务器框架:协程库——定时器模块

基于epoll超时实现定时器功能,精度毫秒级,支持在指定超时时间结束之后执行回调函数。

1 定时器概述

  本项⽬的定时器是为了实现协程调度器对定时任务的调度,服务器上经常要处理定时事件,⽐如3秒后关闭⼀个连 接,或是定期检测⼀个客户端的连接状态,定时器也是后⾯hook模块的基础。

  定时事件依赖于Linux提供的定时机制,它是驱动定时事件的原动⼒,⽬前Linux提供了以下几种可供程序利⽤的定时机制:

  1. alarm()setitimer(),这俩的本质都是先设置⼀个超时时间,然后等SIGALARM信号触发,通过捕获信号来判断超时
  2. 套接字超时选项,对应SO_RECVTIMEOSO_SNDTIMEO,通过errno来判断超时
  3. 多路复⽤超时参数,select/poll/epoll都支持设置超时参数,通过判断返回值为0来判断超时
  4. timer_create系统接⼝,实质也是借助信号,参考man 2 timer_create
  5. timerfd_create系列接⼝,通过判断⽂件描述符可读来判断超时,可配合IO多路复⽤,参考man 2 timerfd_create

  服务器程序通常需要处理众多定时事件,如何有效地组织与管理这些定时事件对服务器的性能至关重要。为此,我们要将每个定时事件分别封装成定时器,并使用某种容器类数据结构,比如链表、排序链表和时间轮,将所有定时器串联起来,以实现对定时事件的统一管理,每个定时器通常至少包含两个成员:一个超时时间(相对时间或绝对时间)和一个任务回调函数。除此外,定时器还可以包括回调函数参数及是否自动重启等信息。

  有两种高效管理定时器的容器:时间轮和时间堆,sylar使用时间堆的方式管理定时器。

1.1 几种定时器的实现

1.1.1 基于升序链表的定时器

  1. 所有定时器组织成链表结构,链表成员包含超时时间,回调函数,回调函数参数,以及链表指针域。
  2. 定时器在链表中按超时时间进行升序排列,超时时间短的在前,长的在后。每次添加定时器时,都要按超时时间将定时器插入到链表的指定位置。
  3. 程序运行后维护一个周期性触发的tick信号,比如利用alarm函数周期性触发ALARM信号,在信号处理函数中从头遍历定时器链表,判断定时器是否超时。如果定时器超时,则记录下该定时器,然后将其从链表中删除。
  4. 执行所有超时的定时器的回调函数。

  以上就是一个基于升序链表的定时器实现,这种方式添加定时器的时间复杂度是O(n),删除定时器的时间复杂度是O(1),执行定时任务的时间复杂度是O(1)tick信号的周期对定时器的性能有较大的影响。当tick信号周期较小时,定时器精度高,但CPU负担较高,因为要频繁执行信号处理函数;当tick信号周期较大时,CPU负担小,但定时精度差。当定时器数量较多时,链表插入操作开销比较大。

1.1.2 时间轮

与上面的升序链表实现方式类似,也需要维护一个周期性触发的tick信号,但不同的是,定时器不再组织成单链表结构,而是按照超时时间,通过散列分布到不同的时间轮上,像下面这样:

test

  上面的时间轮包含N个槽位,每个槽位上都有一个定时器链表。时间轮以恒定的速度顺时针转动,每转一步,表盘上的指针就指向下一个槽位。每次转动对应一个tick,它的周期为si,一个共有N个槽,所以它运转一周的时间是N*si。每个槽位都有一条定时器链表,同一条链表上的每个定时器都具有相同的特征:前后节点的定时时间相差N*si的整数倍。时间轮正是利用这个关系将定时器散列到不同的链表上。假如现在指针指向槽cs,我们要添加一个定时时间为ti的定时器,则该定时器将被插入槽ts(time slot)对应的链表中:ts = (cs + (ti / si)) % N

  时间轮通过哈希表的思想,将定时器散列到不同的链表上,每个链表的定时器数目都明显少于原来的排序链表,插入效率基本不受定时器数目的影响。和升序链表一样,tick的周期将影响定时器精度和CPU负载,除此外,时间轮上的槽数量N还对定时器的效率有影响,N越大,则散列越均匀,插入效率越高,N越小,则散列越容易冲突,至N等于1时,时间轮将完全退化成升序链表。

  上面的时间轮只有一个轮子,而复杂的时间轮可能有多个轮子,不同的轮子拥有不同的粒度。相邻的两个轮子,精度高的转一圈,精度低的仅往前移动一槽,就像水表一样。

单个槽上的定时器链表仍然是按升序链表来组织的,只不过前后两个节点的时间差一定是N*si的整数倍。注意这里前后节点的时间差不一定是1个N*si,也有可能是好几个N*si,所以不能通过定时器所在的槽位和链表位置直接推算出定时器的超时时间。或者换个说法,表盘指针转到某个槽时,仍需要按升序链表的方式遍历这个链表的节点,并判断是否超时。

1.1.3 时间堆

  上面的两种定时器设计都依赖一个固定周期触发的tick。设计定时器的另一种实现思路是直接将超时时间当作tick周期,具体操作是每次都取出所有定时器中超时时间最小的超时值作为一个tick,这样,一旦tick触发,超时时间最小的定时器必然到期。处理完已超时的定时器后,再从剩余的定时器中找出超时时间最小的一个,并将这个最小时间作为下一个tick,如此反复,就可以实现较为精确的定时。最小堆很适合处理这种定时方案,将所有定时器按最小堆来组织,可以很方便地获取到当前的最小超时时间,sylar采取的即是这种方案。

1.2 定时器设计

  sylar的定时器采用最小堆设计,所有定时器根据绝对的超时时间点进行排序,每次取出离当前时间最近的一个超时时间点,计算出超时需要等待的时间,然后等待超时。超时时间到后,获取当前的绝对时间点,然后把最小堆里超时时间点小于这个时间点的定时器都收集起来,执行它们的回调函数。

注意,在注册定时事件时,一般提供的是相对时间,比如相对当前时间3秒后执行。sylar会根据传入的相对时间和当前的绝对时间计算出定时器超时时的绝对时间点,然后根据这个绝对时间点对定时器进行最小堆排序。因为依赖的是系统绝对时间,所以需要考虑校时因素,这点会在后面讨论。

  sylar定时器的超时等待基于epoll_wait,精度只支持毫秒级,因为epoll_wait的超时精度也只有毫秒级。关于定时器和IO协程调度器的整合。IO协程调度器的idle协程会在调度器空闲时阻塞在epoll_wait上,等待IO事件发生。在之前的代码里,epoll_wait具有固定的超时时间,这个值是5秒钟。加入定时器功能后,epoll_wait的超时时间改用当前定时器的最小超时时间来代替。epoll_wait返回后,根据当前的绝对时间把已超时的所有定时器收集起来,执行它们的回调函数。由于epoll_wait的返回并不一定是超时引起的,也有可能是IO事件唤醒的,所以在epoll_wait返回后不能想当然地假设定时器已经超时了,而是要再判断一下定时器有没有超时,这时绝对时间的好处就体现出来了,通过比较当前的绝对时间和定时器的绝对超时时间,就可以确定一个定时器到底有没有超时。

2 定时器实现

  sylar的定时器对应Timer类,这个类的成员变量包括定时器的绝对超时时间点,是否重复执行,回调函数,以及一个指向TimerManager的指针,提供cancel/reset/refresh方法用于操作定时器。构造Timer时可以传入超时时间,也可以直接传入一个绝对时间。Timer的构造函数被定义成私有方式,只能通过TimerManager类来创建Timer对象。除此外,Timer类还提供了一个仿函数Comparator,用于比较两个Timer对象,比较的依据是绝对超时时间。

2.1 Timer

  成员变量:

1
2
3
4
5
bool m_recurring = false;           // 是否循环定时器
uint64_t m_ms = 0; // 执行周期
uint64_t m_next = 0; // 精确的执行时间
std::function<void()> m_cb; // 定时器回调函数
TimerManager* m_manager = nullptr; // 定时器管理器指针

2.1.1 Timer()

  构造函数只能由TimerManager创建Timer,构造函数有两个重载,一个是传入超时时间,一个是传入绝对时间。

1
2
3
4
5
6
7
8
//  传入超时时间
Timer::Timer(uint64_t ms, std::function<void()> cb, bool recurring, TimerManager* manager)
:m_recurring(recurring)
,m_ms(ms)
,m_cb(cb)
,m_manager(manager) { m_next = sylar::GetElapsedMS() + m_ms; }
// 传入绝对时间
Timer::Timer(uint64_t next) : m_next(next) {}

2.1.2 cancel()

  取消定时器,只是将m_next设置为0,并不会立即删除定时器,而是等到下次超时时再判断是否需要执行回调函数。

1
2
3
4
5
6
7
8
9
10
bool Timer::cancel() {
TimerManager::RWMutexType::WriteLock lock(m_manager -> m_mutex);
if(m_cb) {
m_cb = nullptr;
auto it = m_manager -> m_timers.find(shared_from_this()); // 从定时器集合中查找当前定时器
m_manager -> m_timers.erase(it);
return true;
}
return false;
}

2.1.3 refresh()

  刷新定时器,重新设置m_next,并将定时器插入到定时器管理器的定时器集合中。

1
2
3
4
5
6
7
8
9
10
11
12
bool Timer::refresh() {
TimerManager::RWMutexType::WriteLock lock(m_manager -> m_mutex);
if(!m_cb) return false; // 如果回调函数为空,直接返回

auto it = m_manager -> m_timers.find(shared_from_this());// step1: 从定时器集合中查找当前定时器
if(it == m_manager -> m_timers.end()) return false; // 如果定时器不在定时器集合中,直接返回

m_manager -> m_timers.erase(it); // step2: 从定时器集合中删除当前定时器
m_next = sylar::GetElapsedMS() + m_ms; // step3: 重新计算下次执行时间
m_manager -> m_timers.insert(shared_from_this());// step4: 将当前定时器重新添加到定时器集合中
return true;
}

2.1.4 reset()

  重置定时器,取消定时器并重新设置超时时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
bool Timer::reset(uint64_t ms, bool from_now) {
if(ms == m_ms && !from_now) return true; // 如果定时器执行周期和当前周期相同,并且不是从当前时间开始计算,已经是最新的定时器,直接返回true
TimerManager::RWMutexType::WriteLock lock(m_manager -> m_mutex);
if(!m_cb) return false; // 如果回调函数为空,直接返回

auto it = m_manager -> m_timers.find(shared_from_this()); // step1: 从定时器集合中查找当前定时器
if(it == m_manager -> m_timers.end()) return false; // 如果定时器不在定时器集合中,直接返回

m_manager -> m_timers.erase(it); // step2: 从定时器集合中删除当前定时器
uint64_t start_t = 0;
if(from_now) start_t = sylar::GetElapsedMS(); // step3: 如果是从当前时间开始计算,直接使用当前时间
else start_t = m_next - m_ms; // 否则,重新计算执行时间
m_ms = ms;
m_next = start_t + m_ms; // step4: 重置的时间为开始时间+执行周期
m_manager -> m_timers.insert(shared_from_this()); // step5: 将当前定时器重新添加到定时器集合中
return true;
}

2.1.5 Comparator

  仿函数,用于比较两个定时器的超时时间,返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 定时器比较仿函数
struct Comparator {
/**
* @brief 重载()操作符,用于定时器的比较、排序
* @details 比较两个定时器的执行时间,按照执行时间从小到大排序,如果执行时间相同,地址小的排在前面
* @param[in] lhs 定时器指针_left hand side
* @param[in] rhs 定时器指针_right hand side
*/
bool operator()(const Timer::ptr& lhs, const Timer::ptr& rhs) const;
};

// 仿函数的实现
bool Timer::Comparator::operator()(const Timer::ptr& lhs, const Timer::ptr& rhs) const {
if(lhs == rhs) return false;
if(!lhs || lhs -> m_next < rhs -> m_next) return true;
if(!rhs || rhs -> m_next < lhs -> m_next) return false;
return lhs.get() < rhs.get(); // 如果执行时间相同,比较两个指针的地址
}

2.2 TimerManager

  所有的Timer对象都由TimerManager类进行管理,TimerManager包含一个std::set类型的Timer集合,这个集合就是定时器的最小堆结构,因为set里的元素总是排序过的,所以总是可以很方便地获取到当前的最小定时器。TimerManager提供创建定时器,获取最近一个定时器的超时时间,以及获取全部已经超时的定时器回调函数的方法,并且提供了一个onTimerInsertedAtFront()方法,这是一个虚函数,由IOManager继承时实现,当新的定时器插入到Timer集合的首部时,TimerManager通过该方法来通知IOManager立刻更新当前的epoll_wait超时。TimerManager还负责检测是否发生了校时,由detectClockRollover方法实现。

  成员变量:

1
2
3
4
RWMutexType m_mutex;       
std::set<Timer::ptr, Timer::Comparator> m_timers; // 定时器集合
bool m_tickled = false; // 是否触发onTimerInsertedAtFront
uint64_t m_previousTime = 0; // 上次执行时间

2.2.1 TimerManager()

  构造函数,初始化m_previousTime为当前时间。

1
2
3
TimerManager::TimerManager() {
m_previousTime = sylar::GetElapsedMS();
}

2.2.2 addTimer()

  添加定时器,返回一个Timer对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Timer::ptr TimerManager::addTimer(uint64_t ms, std::function<void()> cb, bool recurring) {
Timer::ptr timer(new Timer(ms, cb, recurring, this)); // 创建一个定时器,返回智能指针
RWMutexType::WriteLock lock(m_mutex);
addTimer(timer, lock); // 添加定时器
return timer;
}

void TimerManager::addTimer(Timer::ptr val, RWMutexType::WriteLock& lock) {
auto it = m_timers.insert(val).first; // 将定时器添加到定时器集合中, .first表示返回一个pair,pair的first表示定时器的迭代器
bool at_front = (it == m_timers.begin()) && !m_tickled; // 判断是否是最早执行的定时器
if(at_front) m_tickled = true; // 如果是最早执行的定时器,设置m_tickled为true
lock.unlock(); // 解锁
if(at_front) onTimerInsertedAtFront(); // 判断两次是为了在多线程环境中避免竞态条件,确保在解锁后,如果`at_front`仍为`true`
}

2.2.3 addConditionTimer()

  添加条件定时器,返回一个Timer对象。wear_ptr是一个弱引用,它不会增加所指对象的引用计数,也不会阻止所指对象被销毁。而shared_ptr是一种强引用,它会增加所指对象的引用计数,直到所有shared_ptr都被销毁才会释放所指对象的内存。在这段代码中,weak_cond是一个weak_ptr类型的对象,通过调用它的lock()方法可以得到一个shared_ptr类型的对象tmp,如果weak_ptr已经失效,则lock()方法返回一个空的shared_ptr对象。

1
2
3
4
5
6
7
8
9
static void OnTimer(std::weak_ptr<void> weak_cond, std::function<void()> cb) {
std::shared_ptr<void> tmp = weak_cond.lock(); // 使用weak_cond的lock函数获取一个shared_ptr指针tmp
if(tmp) cb(); // 如果tmp有效,执行回调函数
}

Timer::ptr TimerManager::addConditionTimer(uint64_t ms, std::function<void()> cb, std::weak_ptr<void> weak_cond, bool recurring) {
// 在定时器触发时会调用 OnTimer 函数,并在OnTimer函数中判断条件对象是否存在,如果存在则调用回调函数cb
return addTimer(ms, std::bind(&OnTimer, weak_cond, cb), recurring);
}

2.2.4 getNextTimer()

  获取最近一个定时器执行的时间间隔,返回值为0表示已经超时,否则返回还要多久执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
uint64_t TimerManager::getNextTimer() {
RWMutexType::ReadLock lock(m_mutex);
// 不触发 onTimerInsertedAtFront
m_tickled = false;
if(m_timers.empty()) return ~0ull; // 如果定时器集合为空,返回最大值, ~0ull表示无符号长整型最大值

const Timer::ptr& next = *m_timers.begin(); // 获取定时器集合中最早执行的定时器
uint64_t now_ms = sylar::GetElapsedMS();
// 如果当前时间 >= 该定时器的执行时间,说明该定时器已经超时了,该执行了
if(now_ms >= next->m_next) return 0;
// 还没超时,返回还要多久执行
return next -> m_next - now_ms;
}

2.2.5 listExpiredCb()

  获取所有已经超时的定时器,并执行它们的回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void TimerManager::listExpiredCb(std::vector<std::function<void()>>& cbs) {
uint64_t now_ms = sylar::GetElapsedMS();
std::vector<Timer::ptr> expired;
{
RWMutexType::ReadLock lock(m_mutex);
if(m_timers.empty()) return;
}

RWMutexType::WriteLock lock(m_mutex);
if(m_timers.empty()) return;
bool rollover = false; // 是否发生了时间回拨
if(SYLAR_UNLIKELY(detectClockRollover(now_ms))) rollover = true; // 发生了时间回拨
// 如果没有发生时间回拨,并且第一个定时器都没有到执行时间,就说明没有任务需要执行
if(!rollover && ((*m_timers.begin()) -> m_next > now_ms)) return;

Timer::ptr now_timer(new Timer(now_ms)); // 以当前时间创建一个定时器
// 若发生了时间回拨,则将m_timers的所有Timer视为过期
// 否则返回第一个 >= now_ms的迭代器,在此迭代器之前的定时器全都已经超时
auto it = rollover ? m_timers.end() : m_timers.lower_bound(now_timer);
// 筛选出当前时间等于next时间执行的Timer
while(it != m_timers.end() && (*it) -> m_next == now_ms) {
++it;
}
// 小于当前时间的Timer都是已经过了时间的Timer,it是当前时间要执行的Timer
// 将这些Timer都加入到expired中
expired.insert(expired.begin(), m_timers.begin(), it);
// 将已经放入expired的定时器删掉
m_timers.erase(m_timers.begin(), it);
cbs.reserve(expired.size()); // 预留空间

for(auto& timer : expired) {
cbs.push_back(timer -> m_cb); // 将过期定时器的回调函数添加到回调函数集合中
if(timer -> m_recurring) { // 如果是循环定时器,则再次放入定时器集合中
timer -> m_next = now_ms + timer -> m_ms; // 重新计算下次执行时间
m_timers.insert(timer); // 将定时器重新添加到定时器集合中
} else timer -> m_cb = nullptr; // 如果定时器不是重复执行的,将回调函数置为空
}
}

2.2.6 hasTimer()

  判断定时器是否存在。

1
2
3
4
bool TimerManager::hasTimer() {
RWMutexType::ReadLock lock(m_mutex);
return !m_timers.empty();
}

2.2.7 detectClockRollover()

  检测是否发生了时间回拨,如果发生了时间回拨,需要将所有定时器视为过期。

```cpp bool TimerManager::detectClockRollover(uint64_t now_ms) { bool rollover = false; // 如果当前时间比上次执行时间还小 并且 小于一个小时的时间,认为发生了时间回拨 if(now_ms < m_previousTime && now_ms < (m_previousTime - 60 * 60 * 1000)) rollover = true; m_previousTime = now_ms; // 重新更新时间 return rollover; }

3 总结

  1. 创建定时器时只传入了相对超时时间,内部要先进行转换,根据当前时间把相对时间转化成绝对时间。

  2. sylar支持创建条件定时器,也就是在创建定时器时绑定一个变量,在定时器触发时判断一下该变量是否仍然有效,如果变量无效,那就取消触发。

  3. 关于onTimerInsertedAtFront()方法的作用。这个方法是IOManager提供给TimerManager使用的,当TimerManager检测到新添加的定时器的超时时间比当前最小的定时器还要小时,TimerManager通过这个方法来通知IOManager立刻更新当前的epoll_wait超时,否则新添加的定时器的执行时间将不准确。实际实现时,只需要在onTimerInsertedAtFront()方法内执行一次tickle就行了,tickle之后,epoll_wait会立即退出,并重新从TimerManager中获取最近的超时时间,这时拿到的超时时间就是新添加的最小定时器的超时时间了。

  4. 关于校时问题。sylar的定时器以gettimeofday()来获取绝对时间点并判断超时,所以依赖于系统时间,如果系统进行了校时,比如NTP时间同步,那这套定时机制就失效了。sylar的解决办法是设置一个较小的超时步长,比如3秒钟,也就是epoll_wait最多3秒超时,假设最近一个定时器的超时时间是10秒以后,那epoll_wait需要超时3次才会触发。每次超时之后除了要检查有没有要触发的定时器,还顺便检查一下系统时间有没有被往回调。如果系统时间往回调了1个小时以上,那就触发全部定时器。个人感觉这个办法有些粗糙,其实只需要换个时间源就可以解决校时问题,换成clock_gettime(CLOCK_MONOTONIC_RAW)的方式获取系统的单调时间,就可以解决这个问题了。

参考资料: