C++服务器框架:协程库——协程模块

基于ucontext_t实现非对称协程,本文只涉及协程,不涉及协程的调度。

1 协程模块概述

1.1 概念

首先大致了解以下什么是协程,可以参考以下链接:

  协程第一话 协程到底是怎样的存在?
  协程第二话 协程和IO多路复用更配哦~
  协程的近况、设计与实现中的细节和决策
  一文彻底弄懂C++开源协程库libco——原理及应用
  微信开源C++Libco介绍与应用

  可以将协程当成一种看起来花里胡哨,并且使用起来也花里胡哨的函数。每个协程在创建时都会指定一个入口函数,类似于线程,其本质是函数和函数运行状态的组合,协程和函数的不同在于,函数一旦被调用,只能从头开始执行,直到执行结束退出;而协程可以执行到一半就退出(yield),然后在后续恢复执行(resume),在此期间其他协程可以运行,因此协程也称为轻量级线程。

  协程能够在半路yield并重新resume的关键在于协程存储了函数在yield时间点的执行状态,称为协程上下文。协程上下文包含了函数在当前执行状态下的全部CPU寄存器的值,可通过ucontext_t结构体表示。协程和线程的区别在于,协程虽然称为轻量级线程,但在单线程内并不能并发执行,只能一个协程结束或yield后再执行另一个协程;而线程可以真正并发执行。因此,在协程内部不要使用线程级别的锁来做同步,否则可能导致整个线程锁死。单线程环境下,协程的yieldresume是同步进行的,一个协程的yield对应另一个协程的resume。协程的运行和调度由应用程序完成,与线程不同,线程的运行和调度由操作系统完成。

  创建协程是将一个函数包装成协程对象,然后以协程的方式运行;协程调度是创建一批协程对象,再创建一个调度协程,通过调度协程逐个执行协程对象;IO协程调度是在调度协程时,如果发现协程在等待IO就绪,暂时让出执行权,等IO就绪后重新恢复协程运行;定时器是预设一个协程对象,等定时时间到了就恢复该协程对象的运行。

1.2 进程、线程、协程的区别

1.2.1 从定义来看

  • 进程:进程是资源分配和拥有的基本单位。进程通过内存映射拥有独立的代码和数据空间,若没有内存映射给进程独立的空间,则没有进程的概念了。
  • 线程:线程是程序执行的基本单位。线程都处在一个进程空间中,可以相互访问,没有限制,所以使用线程进行多任务变成十分便利,所以当一个线程崩溃,其他任何一个线程都不能幸免。每个进程中都有唯一的主线程,且只能有一个,主线程和进程是相互依存的关系,主线程结束进程也会结束。
  • 协程:协程是用户态的轻量级线程,线程内部调度的基本单位。协程在线程上执行。

1.2.2 从调度来看

  • 进程:进程由操作系统进行切换,会在用户态与内核态之间来回切换。在切换进程时需要切换虚拟内存空间,切换页表,切换内核栈以及硬件上下文等,开销非常大。

  • 线程:线程由操作系统进行切换,会在用户态与内核态之间来回切换。在切换线程时需要保存和设置少量寄存器内容,开销很小。

  • 协程:协程由用户进行切换,并不会陷入内核态。先将寄存器上下文和栈保存,等切换回来的时候再进行恢复,上下文的切换非常快

1.2.3 从并发性来看

  • 进程:不同进程之间切换实现并发,各自占有CPU实现并行。
  • 线程:个进程内部的个线程并发执行。
  • 协程:同时间只能执行个协程,而其他协程处于休眠状态,适合对任务进行分时处理。

  通过协程,可以让程序按照我们的想法去运行,而不是从头到尾的执行下去。例如在执行一个函数时,可以通过yield退出,让出当前的CPU执行权,等到了合适的时候,通过resume重新恢复运行。 因为协程是在单线程上运行的,并不是并发执行的,是顺序执行的,所以不能使用锁来做协程的同步,这样会直接导致线程的死锁。

1.3 ucontext_t

协程模块基于ucontext_t实现,基本结构如下

1
2
3
4
5
6
7
8
#include <ucontext.h>
typedef struct ucontext_t {
 struct ucontext_t* uc_link;
 sigset_t uc_sigmask;
 stack_t uc_stack;
 mcontext_t uc_mcontext;
...
};
  • uc_link:为当前context执行结束之后要执行的下一个context,若uc_link为空,执行完当前context之后退出程序。
  • uc_sigmask:执行当前上下文过程中需要屏蔽的信号列表,即信号掩码
  • uc_stack:为当前context运行的栈信息。
    • uc_stack.ss_sp:栈指针指向stack, uc_stack.ss_sp = stack;
    • uc_stack.ss_size:栈大小, uc_stack.ss_size = stacksize;
  • uc_mcontext:保存具体的程序执行上下文,如PC值,堆栈指针以及寄存器值等信息。它的实现依赖于底层,是平台硬件相关的。此实现不透明。

相关函数:

1
2
3
4
5
#include <ucontext.h>
void makecontext(ucontext_t* ucp, void (*func)(), int argc, ...);
int swapcontext(ucontext_t* olducp, ucontext_t* newucp);
int getcontext(ucontext_t* ucp);
int setcontext(const ucontext_t* ucp);
  • makecontext:初始化一个ucontext_tfunc参数指明了该context的入口函数,argc为入口参数的个数,每个参数的类型必须是int类型。另外在makecontext前,一般需要显示的初始化栈信息以及信号掩码集同时也需要初始化uc_link,以便程序退出上下文后继续执行。
  • swapcontext:原子操作,该函数的工作是保存当前上下文并将上下文切换到新的上下文运行。
  • getcontext:将当前的执行上下文保存在CPU中,以便后续恢复上下文。
  • setcontext:将当前程序切换到新的context,在执行正确的情况下该函数直接切换到新的执行状态,不会返回。

注意:setcontext执行成功不返回,getcontext执行成功返回0,若执行失败都返回-1。若uc_linkNULL,执行完新的上下文之后程序结束。

2 协程模块设计

  使用非对称协程模型,也就是子协程只能和线程主协程切换,而不能和另一个子协程切换,并且在程序结束时,一定要再切回主协程,以保证程序能正常结束,像下面这样:

  在对称协程中,子协程可以直接和子协程切换,也就是说每个协程不仅要运行自己的入口函数代码,还要负责选出下一个合适的协程进行切换,相当于每个协程都要充当调度器的角色,这样程序设计起来会比较麻烦,并且程序的控制流也会变得复杂和难以管理。而在非对称协程中,可以借助专门的调度器来负责调度协程,每个协程只需要运行自己的入口函数,然后结束时将运行权交回给调度器,由调度器来选出下一个要执行的协程即可。非对称协程的行为与函数类似,因为函数在运行结束后也总是会返回调用者。

  虽然目前还没有涉及到协程调度,但这里其实可以将线程的主协程想像成线程的调度协程,每个子协程执行完了,都必须切回线程主协程,由主协程负责选出下一个要执行的子协程。如果子协程可以和子协程切换,那就相当于变相赋予了子协程调度的权利,这在非对称协程里是不允许的。项目中借助了线程局部变量的功能来实现协程模块。线程局部变量与全局变量类似,不同之处在于声明的线程局部变量在每个线程都独有一份,而全局变量是全部线程共享一份。

  项目中使用线程局部变量(C++11 thread_local变量)来保存协程上下文对象,这点很好理解,因为协程是在线程里运行的,不同线程的协程相互不影响,每个线程都要独自处理当前线程的协程切换问题。对于每个线程的协程上下文,项目中设计了两个线程局部变量来存储上下文信息(对应源码的t_fibert_thread_fiber),也就是说,一个线程在任何时候最多只能知道两个协程的上下文。又由于项目中只使用swapcontext来做协程切换,那就意味着,这两个线程局部变量必须至少有一个是用来保存线程主协程的上下文的,如果这两个线程局部变量存储的都是子协程的上下文,那么不管怎么调用swapcontext,都没法恢复主协程的上下文,也就意味着程序最终无法回到主协程去执行,程序也就跑飞了。

  如果将线程的局部变量设置成一个类似链表的数据结构,那理论上应该也可以实现对称协程,也就是子协程可以直接和子协程切换,但代码复杂度上肯定会增加不少,因为要考虑多线程和公平调度的问题。项目中的非对称协程代码实现简单,并且在后面实现协程调度时可以做到公平调度,缺点是子协程只能和线程主协程切换,意味着子协程无法创建并运行新的子协程,并且在后面实现协程调度时,完成一次子协程调度需要额外多切换一次上下文。

2.1 协程状态

  协程状态在原项目基础上进行简化,对每个协程,只设计了3种状态,分别是READY——就绪态,RUNNING——正在运行,TERM——运行结束。去掉了INIT状态、HOLD状态和EXCEPT状态。

  INIT状态是协程对象刚创建时的状态,这个状态可以直接归到READY状态里。HOLD状态和READY状态与协程调度有关,READY状态的协程会被调度器自动重新调度,而HOLD状态的协程需要显式地再次将协程加入调度,这两个状态也可以归到READY状态里,都表示可执行状态。EXCEPT状态,表示协程入口函数执行时出现异常的状态,这个状态可以不管,具体到协程调度模块再讨论。

去掉这几个状态后,协程的状态模型就简单得一目了然了,一个协程要么正在运行(RUNNING),要么准备运行(READY),要运行结束(TERM)。

  状态简化后,唯一的缺陷是无法区分一个READY状态的协程对象是刚创建,还是已经运行到一半yield了,这在重置协程对象时有影响。重置协程时,如果协程对象只是刚创建但一次都没运行过,那应该是允许重置的,但如果协程的状态是运行到一半yield了,那应该不允许重置。虽然可以把INIT状态加上以区分READY状态,但既然简化了状态,那就简化到底,让协程只有在TERM状态下才允许重置,问题迎刃而解。

2.2 协程原语

  对于非对称协程来说,协程除了创建语句外,只有两种操作,一种是resume,表示恢复协程运行,一种是yield,表示让出执行。协程的结束没有专门的操作,协程函数运行结束时协程即结束,协程结束时会自动调用一次yield以返回主协程。

3 Fiber类实现

  • 成员变量:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/// id
uint64_t m_id = 0;
/// 栈大小
uint32_t m_stacksize = 0;
/// 状态
State m_state = READY;
/// 上下⽂
ucontext_t m_ctx;
/// 栈地址
void *m_stack = nullptr;
/// 入口函数
std::function<void()> m_cb;
/// 是否参与调度
bool m_runInScheduler;
  • 两个全局静态变量,用于生成协程id和统计当前的协程数量:
1
2
3
4
/// 全局静态变量,用于生成协程id
static std::atomic<uint64_t> s_fiber_id{0};
/// 全局静态变量,用于统计当前的协程数
static std::atomic<uint64_t> s_fiber_count{0};
  • 对于每个线程,都有以下两个线程局部变量用于保存协程上下文信息:
1
2
3
4
/// 线程局部变量,当前线程正在运行的协程
static thread_local Fiber *t_fiber = nullptr;
/// 线程局部变量,当前线程的主协程,切换到这个协程,就相当于切换到了主线程中运行,智能指针形式
static thread_local Fiber::ptr t_thread_fiber = nullptr;

这两个线程局部变量保存的协程上下文对协程的实现至关重要,分别的作用如下:

  • t_fiber:保存当前正在运行的协程指针,必须时刻指向当前正在运行的协程对象。协程模块初始化时,t_fiber指向线程主协程对象。

  • t_thread_fiber:保存线程主协程指针,智能指针形式。协程模块初始化时,t_thread_fiber指向线程主协程对象。当子协程resume时,通过swapcontext将主协程的上下文保存到t_thread_fiberucontext_t成员中,同时激活子协程的ucontext_t上下文。当子协程yield时,从t_thread_fiber中取得主协程的上下文并恢复运行。

3.1 构造函数

  Fiber类提供了两个构造函数,无参构造函数用于初始化当前线程的协程功能,构造线程主协程对象,以及对t_fibert_thread_fiber进行赋值。有参构造函数用于构造子协程,初始化子协程的ucontext_t上下文和栈空间,要求必须传入协程的入口函数,以及可选的协程栈大小。

  无参构造函数被定义成私有方法,不允许在类外部调用,只能通过GetThis()方法,在返回当前正在运行的协程时,如果发现当前线程的主协程未被初始化,那就用不带参的构造函数初始化线程主协程。

3.2 无参构造函数

  ⽆参构造函数只⽤于创建线程的第⼀个协程,也就是线程主函数对应的协程。

1
2
3
4
5
6
7
8
9
10
11
12
Fiber::Fiber() {
SetThis(this); // 设置当前协程
m_state = RUNNING;
// 获取当前协程的上下文信息保存到m_ctx中
if (getcontext( &m_ctx)) {
SYLAR_ASSERT2(false, "getcontext");
}
++s_fiber_count;
m_id = s_fiber_id++;

SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber() main id = " << m_id;
}

3.3 有参构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Fiber::Fiber(std::function<void()> cb, size_t stacksize, bool run_in_scheduler)
: m_id(s_fiber_id++)
, m_cb(cb)
, m_runInScheduler(run_in_scheduler){

++s_fiber_count;
// 若给了初始化值则用给定值,若没有则用约定值
m_stacksize = stacksize ? stacksize : g_fiber_stack_size -> getValue();
// 获得协程运行指针
m_stack = StackAllocator::Alloc(m_stacksize);
// 保存当前协程上下文信息到m_ctx中
if(getcontext(&m_ctx)) {
SYLAR_ASSERT2(false, "getcontext");
}
// uc_link置空,执行完当前context之后退出程序。
m_ctx.uc_link = nullptr;
// 初始化栈指针
m_ctx.uc_stack.ss_sp = m_stack;
// 初始化栈大小
m_ctx.uc_stack.ss_size = m_stacksize;
// 指明该context入口函数
makecontext(&m_ctx, &Fiber::MainFunc, 0);

SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber() id = " << m_id;

}

3.2 ~Fiber()

  析构函数,用于释放协程的栈空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Fiber::~Fiber() {
SYLAR_LOG_DEBUG(g_logger) << "Fiber::~Fiber() id = " << m_id;
--s_fiber_count;
// 根据栈内存是否为空,进行不同的释放操作
if (m_stack) {
// 有栈,子协程, 需确保子协程为结束状态
SYLAR_ASSERT(m_state == TERM);
// 释放运行栈
StackAllocator::Dealloc(m_stack, m_stacksize);
SYLAR_LOG_DEBUG(g_logger) << "Dealloc Stack, id = " << m_id;
} else {
// 无栈,主协程, 释放要保证没有任务并且当前正在运行
SYLAR_ASSERT(!m_cb);
SYLAR_ASSERT(m_state == RUNNING);

Fiber* cur = t_fiber;
//若当前协程为主协程,将当前协程置为空
if (cur == this) {
SetThis(nullptr);
}
}
}

3.3 reset()

  重置协程,就是重复利用已结束的协程,复用其栈空间,创建新协程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void Fiber::reset(std::function<void()> cb) {
// 主协程不分配栈空间
SYLAR_ASSERT(m_stack);
// 当前协程在结束状态
SYLAR_ASSERT(m_state == TERM);
m_cb = cb;
if (getcontext(&m_ctx)) SYLAR_ASSERT2(false, "getcontext");

m_ctx.uc_link = nullptr;
m_ctx.uc_stack.ss_sp = m_stack;
m_ctx.uc_stack.ss_size = m_stacksize;

makecontext(&m_ctx, &Fiber::MainFunc, 0);
m_state = READY;
}

3.4 resume()

  将当前协程切换到运⾏状态,和正在运行的协程进行交换,前者状态变为RUNNING,后者状态变为READY

1
2
3
4
5
6
7
8
9
void Fiber::resume() {
SYLAR_ASSERT(m_state != TERM && m_state != RUNNING);
SetThis(this);
m_state = RUNNING;

if (swapcontext(&(t_thread_fiber->m_ctx), &m_ctx)) {
SYLAR_ASSERT2(false, "swapcontext");
}
}

3.5 yield()

  当前协程让出执行权。当前协程与上次resume时退到后台的协程进行交换,前者状态变为READY,后者状态变为RUNNING

1
2
3
4
5
6
7
8
9
10
11
12
void Fiber::yield() {
/// 协程运行完之后会自动yield一次,用于回到主协程,此时状态已为结束状态
SYLAR_ASSERT(m_state == RUNNING || m_state == TERM);
SetThis(t_thread_fiber.get());
if (m_state != TERM) {
m_state = READY;
}

if (swapcontext(&m_ctx, &(t_thread_fiber->m_ctx))) {
SYLAR_ASSERT2(false, "swapcontext");
}
}

  在非对称协程里,执行resume时的当前执行环境一定是位于线程主协程里,所以这里的swapcontext操作的结果把主协程的上下文保存到t_thread_fiber -> m_ctx中,并且激活子协程的上下文;而执行yield时,当前执行环境一定是位于子协程里,所以这里的swapcontext操作的结果是把子协程的上下文保存到协程自己的m_ctx中,同时从t_thread_fiber获得主协程的上下文并激活。

3.6 GetThis()

  返回当前线程正在执行的协程,如果当前线程还未创建协程,则创建线程的第一个协程,且该协程为当前线程的主协程,其他协程都通过这个协程来调度。也就是说,其他协程结束时,都要切回到主协程,由主协程重新选择新的协程进行resume

1
2
3
4
5
6
7
8
9
10
11
12
13
Fiber::ptr Fiber::GetThis() {

if (t_fiber) {
return t_fiber -> shared_from_this();
}

///如果当前线程还未创建协程,则创建线程的第一个协程
Fiber::ptr main_fiber(new Fiber);
// 此时当前协程应该为主协程
SYLAR_ASSERT(t_fiber == main_fiber.get());
t_thread_fiber = main_fiber;
return t_fiber -> shared_from_this();
}

3.7 MainFunc()

  协程的入口函数,用于执行协程的回调函数。在用户传入的协程入口函数上进行了一次封装,这个封装类似于线程模块的对线程入口函数的封装。通过封装协程入口函数,可以实现协程在结束自动执行yield的操作。

1
2
3
4
5
6
7
8
9
10
11
12
void Fiber::MainFunc() {
Fiber::ptr cur = GetThis(); //GetThis()的shared_from_this()⽅法让引用计数+1
SYLAR_ASSERT(cur);

cur -> m_cb();//这里真正执行协程的入口函数
cur -> m_cb = nullptr;
cur -> m_state = TERM;

auto raw_ptr = cur.get();//手动让t_fiberde的引用计数-1
cur.reset();
raw_ptr -> yield();// 协程结束时⾃动yield, 切换到主协程
}

3.8 其他细节

  • 关于协程id,通过全局静态变量s_fiber_id的自增来生成协程id,每创建一个新协程,s_fiber_id自增1,并作为新协程的id(实际是先取值,再自增1)。

  • 关于线程主协程的构建,线程主协程代表线程入口函数或是main函数所在的协程,这两种函数都不是以协程的手段创建的,所以它们只有ucontext_t上下文,但没有入口函数,也没有分配栈空间。

  • 关于协程切换,子协程的resume操作一定是在主协程里执行的,主协程的resume操作一定是在子协程里执行的,这点完美和swapcontext匹配,参考上面协程原语的实现。

  • 关于智能指针的引用计数,由于t_fibert_thread_fiber一个是原始指针一个是智能指针,混用时要注意智能指针的引用计数问题,不恰当的混用可能导致协程对象已经运行结束,但未析构问题。关于协程对象的智能指针引用计数跟踪可参考test_fiber.cc

  • 非对称协程,子协程不能创建并运行新的子协程,否则会导致程序无法回到主协程,从而无法正常结束。