C++服务器框架:协程库——I/O协程调度

继承自协程调度器,封装了epoll,支持为socket fd注册读写事件回调函数。

1 I/O协程调度概述

  I/O事件调度功能对服务器开发⾄关重要,因为服务器通常需要处理⼤量来⾃客户端的socket fd,使⽤I/O事件调度可以将开发者从判断socket fd是否可读或可写的⼯作中解放出来,使得程序员只需要关⼼socket fdI/O操作,实现I/O协程调度意义重⼤。

  I/O协程调度可以看成是增强版的协程调度。在前⾯的协程调度模块中,调度器对协程的调度是⽆条件执⾏的,在调度器已经启动调度的情况下,任务⼀旦添加成功,就会排队等待调度器执⾏。调度器不⽀持删除调度任务,并且调度器在正常退出之前⼀定会执⾏完全部的调度任务,所以在某种程度上可以认为,把⼀个协程添加到调度器的任务队列,就相当于调⽤了协程的resume⽅法。

  I/O协程调度支持协程调度的全部功能,因为I/O协程调度器是直接继承协程调度器实现的。除了协程调度,I/O协程调度还增加了I/O事件调度的功能,这个功能是针对描述符(一般是套接字描述符)的。I/O协程调度支持为描述符注册可读和可写事件的回调函数,当描述符可读或可写时,执行对应的回调函数。(这里可以直接把回调函数等效成协程,所以这个功能被称为I/O协程调度)。

  很多的库都可以实现类似的工作,比如libeventlibuvlibev等,这些库被称为异步事件库或异步I/O库。有的库不仅可以处理socket fd事件,还可以处理定时器事件和信号事件。这些事件库的实现原理基本类似,都是先将套接字设置成非阻塞状态,然后将套接字与回调函数绑定,接下来进入一个基于I/O多路复用的事件循环,等待事件发生,然后调用对应的回调函数。

  sylar在协程调度模块的基础上,封装了epoll。支持对I/O事件的调度功能,可以为socket句柄添加读事件(EPOLLIN)和写事件(EPOLLOUT),并且支持删除事件功能。I/OManager主要通过FdContext结构体存储文件描述符fd, 注册的事件event,执行任务cb/fiber,其中fdevent用于epoll_waitcb/fiber用于执行任务。当有任务时,使用管道pipe来唤醒epoll_wait先执行其他任务。

1.2 相关概念

  在I/O操作时,操作系统通常会将数据缓存在文件系统的页缓存(page cache)中。这意味着数据的传输过程分为两个阶段:

  • 第一阶段:数据首先被拷贝到操作系统内核的缓冲区中。
  • 第二阶段:数据从操作系统内核的缓冲区拷贝到应用程序的地址空间。

1.2.1 阻塞I/O

在默认情况下,所有的socket都是被阻塞的,也就是阻塞I/O,这样会导致两个阶段的阻塞: - 进程可能需要等待数据到达。 - 数据从内核拷贝到用户空间时,进程也可能被阻塞。

1.2.2 非阻塞I/O

  通过设置,可以将socket变为非阻塞模式(non-blocking),在非阻塞模式下:

  • 如果内核还没有准备好数据,那么它不会阻塞用户进程,而是立即返回一个错误。
  • 当内核准备好数据并且再次收到用户进程的system call时,数据会被拷贝到用户内存。然而,这个拷贝过程(第二阶段)仍然可能导致进程被阻塞。

1.2.3 异步I/O

  异步I/OAsynchronous I/O)允许在两个阶段都不会阻塞用户进程:

  • 用户进程发起read操作后,内核收到system call会立即返回,不会阻塞用户进程。
  • 当内核准备好数据并完成拷贝到用户空间后,内核会通过某种机制(如信号或回调函数)通知用户进程,整个过程中用户进程不会被阻塞。

  异步I/O是真正的异步操作,它在两个阶段都不会阻塞用户进程。阻塞I/O在两个阶段都可能阻塞用户进程。非阻塞I/O在第一阶段不阻塞用户进程,但在第二阶段拷贝数据时可能会阻塞。I/O多路复用是同步I/O的一种形式,它允许进程同时监视多个I/O操作,但在I/O操作就绪时,进程仍然需要被阻塞以处理这些操作。

2 I/O多路复用

  服务器需要与多个客户端建立连接时,会涉及处理大量的socket文件描述符。为了有效管理这些文件描述符并进行I/O操作,可以利用I/O多路复用技术。

当用户进程调用select函数时,整个进程会被阻塞。同时,内核会监视所有由select负责的socket文件描述符。一旦任何一个socket中的数据准备好了,select就会返回,此时用户进程可以调用recvfrom函数来接收数据。内核收到系统调用后将数据拷贝到用户进程中。

在Linux中,主要有三种常用的I/O多路复用方式:selectpollepoll。通常情况下,将socket设置为非阻塞模式(O_NONBLOCK),这样在进行I/O操作时,用户进程虽然仍然会被阻塞,但是是被select函数阻塞,而不是被socket I/O阻塞。

  • select返回了,那一定是socket中的数据准备好了, recvfrom也不会阻塞了,所以设不设置socket为非阻塞模式似乎没什么区别?

  将socket设置为非阻塞模式(使用O_NONBLOCK标志)可以确保即使select报告socket可读,recvfrom调用也不会阻塞。这是因为在非阻塞模式下,如果数据没有准备好,recvfrom会立即返回一个错误,而不是等待数据。此外,在Linux下,select可能会将socket报告为“准备读取”,即使实际上并没有数据可读。这可能是由于多种原因,包括但不限于网络条件、socket状态的变化,或者是内核内部的实现细节。因此,即使select返回,也不能保证recvfrom调用一定会成功。最后,在不应阻塞的socket上使用非阻塞模式(O_NONBLOCK)可以避免潜在的阻塞,确保应用程序能够继续响应其他事件或进行其他操作。

2.1 select

1
2
3
4
5
6
7
8
/*
@param: n 最大文件描述符+1
@param: readfds 读文件描述符
@param writefds 写文件描述符
@param exceptfds 异常文件描述符
@param timeout 超时事件
*/
int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

当进程调用select时将会被阻塞,fd_set的数据结构为bitmap,通过FD_SET方法将需要监听的文件描述符集合fdset对应的bitmap置为1(例如文件描述符集合为49,那么就将bitmap的第4位和第9位置为1),select会截取bitmapn位进行监听。select会将需要关注的fd_set拷贝到内核态监听,当有数据来时,内核将有数据的fd_set置位(bitmap对应的文件描述符置位为相应的操作,读、写、异常),select返回。因为不知道是哪个文件描述符来数据了,所以再遍历fdset寻找就绪的文件描述符。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。

select的缺点: 1. 单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。 2. fd_set是不可重用的,每次需要使用FD_ZERO方法清空。 3. 每次调用select都需要将fd_set拷贝到内核态,有开销。 4. 不知道是哪个文件描述符的数据来了,所以要遍历,时间复杂度为O(n)

2.2 poll

1
2
3
4
5
6
7
8
9
10
11
12
/*
param fds fd事件
param nfds fd数量
param timeout 超时时间
*/
int poll (struct pollfd *fds, unsigned int nfds, int timeout);

struct pollfd {
   int fd; /* file descriptor */
   short events; /* requested events to watch */
   short revents; /* returned events witnessed */
};

poll数据结构select不同poll采用数组存储pollfd,并将fd和关注的事件(POLLIN等)分别保存到pollfdfdevents中。

pollselect工作原理相同,但要注意的是,当数据来时,pollrevents置位(POLLIN等),然后poll函数返回。仍然要遍历数组来看是哪个文件描述符来了,并且将revents置为0,这样就能重复使用pollfd

poll优点:

  1. 解决了select1024上限。
  2. 解决了select fd_set不可重用,pollfd可以通过重置revents恢复如初。

poll缺点:

  1. 每次调用poll都需要将pollfd拷贝到内核态,有开销。
  2. 不知道是哪个文件描述符的数据来了,所以要遍历,时间复杂度为O(n)

2.3 epoll

  epoll是在2.6内核中提出的,是之前的selectpoll的增强版本。相对于selectpoll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

  epoll通过以下3个接口操作:

更多详细内容可以查看深度Linux - epoll性能那么高,为什么?

2.3.1 epoll_create

1
2
//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_create(int size)

  创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。但看了源码,只要size大于0就可以了,没有实质性的作用。

  当创建好epoll句柄后,它就会占用一个fd值,在Linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

  通过源码得知,每创建一个epollfd,内核就会分配一个eventpoll结构体与之对应,其中维护了一个RBTree来存放所有要监听的struct epitem(表示一个被监听的fd)。

2.3.2 epoll_ctl

  从用户空间将epoll_event结构copy到内核空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
@param epfd epoll_create()的返回值
@param op 添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD事件
@param event 告诉内核需要监听什么事
*/
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
   
struct epoll_event {
 __uint32_t events;  /* Epoll events */
 epoll_data_t data;  /* User data variable */
};

/*
* epoll事件关联数据的联合体
* fd: 表示关联的文件描述符。
* ptr:表示关联的指针。
*/
typedef unI/On epoll_data {
   void *ptr;
   int fd;
   __uint32_t u32;
   __uint64_t u64;
} epoll_data_t;

events可以是以下几个宏的集合: - EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭); - EPOLLOUT:表示对应的文件描述符可以写; - EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来); - EPOLLERR:表示对应的文件描述符发生错误; - EPOLLHUP:表示对应的文件描述符被挂断; - EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。 - EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。

  通过源码得知,同一个fd不能重复添加。内核会自动添加这两个事件epds.events |= POLLERR | POLLHUP;并且使用copy_from_user从用户空间将epoll_event结构copy到内核空间。

1
2
if (ep_op_has_event(op) &&
   copy_from_user(&epds, event, sizeof(struct epoll_event)))

2.3.3 epoll_wait

1
2
3
4
5
6
7
8
9
10
11
/*
@param epfd epoll_create() 返回的句柄
@param events 分配好的 epoll_event 结构体数组,epoll 将会把发生的事件复制到 events 数组中
 events不可以是空指针,内核只负责把数据复制到这个 events 数组中,不会去帮助我们在用户态中分配内存,但是内核会检查空间是否合法
@param maxevents 表示本次可以返回的最大事件数目,通常 maxevents 参数与预分配的 events 数组的大小是相等的;
@param timeout 表示在没有检测到事件发生时最多等待的时间(单位为毫秒)
  如果 timeout 为 0,则表示 epoll_wait 在 rdllist 链表为空时,立刻返回,不会等待。
  rdllist:所有已经ready的epitem(表示一个被监听的fd)都在这个链表里面
 
*/
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

  收集在epoll监控的事件中已经发生的事件,如果epoll中没有任何一个事件发生,则最多等待timeout毫秒后返回。epoll_wait的返回值表示当前发生的事件个数,如果返回0,则表示本次调用中没有事件发生,如果返回-1,则表示发生错误,需要检查errno判断错误类型。

  通过源码得知,通过__put_user将数据从内核空间拷贝到用户空间。

1
2
3
4
5
if (__put_user(revents, &uevent->events) ||
   __put_user(epi->event.data, &uevent->data)) {
   list_add(&epi->rdllink, head);
   return eventcnt ? eventcnt : -EFAULT;
}

2.3.4 epoll工作模式

  epoll有两种工作模式,LT(水平触发)模式与ET(边缘触发)模式。默认情况下,epoll采用LT模式工作。两个的区别是:

  • Level_triggered(水平触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用epoll_wait 时,它还会通知在没读写完的文件描述符上继续读写,当然如果一直不去读写,会一直通知。如果系统中有大量不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率。

  • Edge_triggered(边缘触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait 会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait时,它不会通知,也就是它只会通知一次,直到该文件描述符上出现第二次可读写事件才会再次通知。这种模式比水平触发效率高,系统不会充斥大量用户不关心的就绪文件描述符。

  在LT模式下开发基于epoll的应用要简单一些,不太容易出错,而在ET模式下事件发生时,如果没有彻底地将缓冲区的数据处理完,则会导致缓冲区的用户请求得不到响应。注意,默认情况下Nginx采用ET模式使用epoll的。

2.3.5 epoll优点

  1. 监视的描述符数量不受限制:它所支持的fd上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat/proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。

  2. IO的效率不会随着监视fd的数量的增长而下降:

    • epoll不同于selectpoll轮询的方式,而是通过每个fd定义的回调函数来实现的。只有就绪的fd才会执行回调函数ep_poll_callback()
    • ep_poll_callback()的调用时机是由被监听的fd的具体实现,比如socket或者某个设备驱动来决定的,因为等待队列头是他们持有的,epoll和当前进程只是单纯的等待。
  3. epoll使用一个文件描述符管理多个描述符:将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

2.4 epollselectpoll的比较

select poll epoll
数据结构 bitmap 数组 红黑树+链表
最大连接数 1024 无上限 无上限
fd拷贝 每次调用select拷贝 每次调用poll拷贝 首次调用epoll_ctl拷贝,每次调用epoll_wait不拷贝
工作效率 轮询:O(n) 轮询:O(n) 回调:O(1)

3 I/OManager实现

  言归正传,sylarI/O协程调度模块基于epoll实现,只支持Linux平台。对每个fdsylar支持两类事件,一类是可读事件,对应EPOLLIN,一类是可写事件,对应EPOLLOUTsylar的事件枚举值直接继承自epoll。当然epoll本身除了支持了EPOLLINEPOLLOUT两类事件外,还支持其他事件,比如EPOLLRDHUP, EPOLLERR, EPOLLHUP等,对于这些事件,sylar的做法是将其进行归类,分别对应到EPOLLINEPOLLOUT中,也就是所有的事件都可以表示为可读或可写事件,甚至有的事件还可以同时表示可读及可写事件,比如EPOLLERR事件发生时,fd将同时触发可读和可写事件。

  对于IO协程调度来说,每次调度都包含一个三元组信息,分别是描述符-事件类型(可读或可写)-回调函数,调度器记录全部需要调度的三元组信息,其中描述符和事件类型用于epoll_wait,回调函数用于协程调度。这个三元组信息在源码上通过FdContext结构体来存储,在执行epoll_wait时通过epoll_event的私有数据指针data.ptr来保存FdContext结构体信息。IO协程调度器在idle时会epoll_wait所有注册的fd,如果有fd满足条件,epoll_wait返回,从私有数据中拿到fd的上下文信息,并且执行其中的回调函数。(实际是idle协程只负责收集所有已触发的fd的回调函数并将其加入调度器的任务队列,真正的执行时机是idle协程退出后,调度器在下一轮调度时执行)。与协程调度器不一样的是,IO协程调度器支持取消事件。取消事件表示不关心某个fd的某个事件了,如果某个fd的可读或可写事件都被取消了,那这个fd会从调度器的epoll_wait中删除。

3.1 IOManager

sylarIO协程调度器对应IOManager,这个类直接继承自Scheduler

1
2
3
4
5
6
class IOManager : public Scheduler {
public:
typedef std::shared_ptr<IOManager> ptr;
typedef RWMutex RWMutexType;
...
}

首先是读写事件的定义,这里直接继承epoll的枚举值,如下:

1
2
3
4
5
6
7
/// @brief IO事件,继承自epoll_event对事件的定义
/// @details 这里只关心socket fd的读和写事件,其他epoll事件会归类到这两类事件中
enum Event {
NONE = 0x0, // 无事件
READ = 0x1, // 读事件(EPOLLIN)
WRITE = 0x4 // 写事件(EPOLLOUT)
};

3.2 FdContext结构体

接下来是对描述符-事件类型-回调函数三元组的定义,这个三元组也称为fd上下文,使用结构体FdContext来表示。由于fd有可读和可写两种事件,每种事件的回调函数也可以不一样,所以每个fd都需要保存两个事件类型-回调函数组合。FdContext结构体定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/// @brief Socket事件上下文
/// @details 每个socket fd都对应一个FdContext,包括fd的值,fd上的事件,以及fd的读写事件上下文
struct FdContext {
typedef Mutex MutexType;
/**
* @brief 事件上下文类
* @details fd的每个事件都有一个事件上下文,保存这个事件的回调函数以及执行回调函数的调度器
* sylar对fd事件做了简化,只预留了读事件和写事件,所有的事件都被归类到这两类事件中
*/
struct EventContext {
Scheduler* scheduler = nullptr; /// 执行事件回调的调度器
Fiber::ptr fiber; /// 事件回调协程
std::function<void()> cb; /// 事件回调函数
};

/// 获取事件上下文
EventContext& getEventContext(Event event);
/// 重置事件上下文
void resetEventContext(EventContext& ctx);
/// 触发事件, 根据事件类型调用对应上下文结构中的调度器去调度回调协程或回调函数
void triggerEvent(Event event);

EventContext read; /// 读事件上下文
EventContext write; /// 写事件上下文
int fd = 0; /// 事件关联的fd
Event events = NONE; ///该fd添加了哪些事件的回调函数,或者说该fd关心哪些事件
MutexType mutex; /// 事件上下文的锁
};

3.3 成员变量

IOManager包含一个epoll实例的句柄m_epfd以及用于tickle的一对pipe fd,还有全部的fd上下文m_fdContexts,如下:

1
2
3
4
5
int m_epfd = 0;                                 /// epoll文件句柄
int m_tickleFds[2]; /// pipe 文件句柄,fd[0]读端,fd[1]写端
std::atomic<size_t> m_pendingEventCount = {0}; /// 当前待处理的事件数量
RWMutexType m_mutex; /// 读写锁
std::vector<FdContext*> m_fdContexts; /// fd上下文数组

3.4 IOManager()

IOManager类的构造函数,接收三个参数,分别是线程数量,是否使用调度线程,以及调度器的名字。在构造函数中,首先创建epoll句柄,然后创建一个pipe,并将读端添加到epoll中,这个pipe的作用是用于唤醒epoll_wait,因为epoll_wait是阻塞的,如果没有事件发生,epoll_wait会一直阻塞,所以需要一个pipe来唤醒epoll_wait。最后调用contextResize函数初始化fd上下文数组,然后启动调度器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
IOManager::IOManager(size_t threads, bool use_caller, const std::string& name)
: Scheduler(threads, use_caller, name) {
m_epfd = epoll_create(5000); // 创建epoll句柄, 参数为epoll监听的fd的数量
SYLAR_ASSERT(m_epfd > 0); // 断言创建成功

int rt = pipe(m_tickleFds); // 创建管道
SYLAR_ASSERT(!rt);

epoll_event event;
memset(&event, 0, sizeof(epoll_event));// 初始化epoll事件
event.events = EPOLLIN | EPOLLET; // 读事件,边缘触发
event.data.fd = m_tickleFds[0]; // 管道的读端

rt = fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK); // 设置非阻塞
SYLAR_ASSERT(!rt);

rt = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event); // 添加管道读端的事件
SYLAR_ASSERT(!rt);

contextResize(32);
start();
}

3.5 ~IOManager()

IOManager类的析构函数,关闭epoll句柄和pipe,并且释放fd上下文数组。

1
2
3
4
5
6
7
8
9
10
IOManager::~IOManager() {
stop();
close(m_epfd);
close(m_tickleFds[0]);
close(m_tickleFds[1]);

for (size_t i = 0; i < m_fdContexts.size(); ++i) {
if (m_fdContexts[i]) delete m_fdContexts[i];
}
}

3.6 addEvent()

addEvent函数用于添加fd的事件,fd的事件类型,以及事件的回调函数。首先通过RWMutex加写锁,然后通过fd获取fd上下文,如果fd上下文不存在,则创建一个新的fd上下文,然后根据事件类型设置事件上下文的调度器和回调函数,最后将fd上下文的事件类型添加到fd上下文的事件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
int IOManager::addEvent(int fd, Event event, std::function<void()> cb) {
// 初始化一个 FdContext
FdContext* fd_ctx = nullptr;
RWMutexType::ReadLock lock(m_mutex);
// 从 m_fdContexts 中拿到对应的 FdContext
if ((int)m_fdContexts.size() > fd) {
fd_ctx = m_fdContexts[fd];
lock.unlock();
} else { // 如果 fd 对应的 FdContext 不存在,那么就需要扩容
lock.unlock();
RWMutexType::WriteLock lock2(m_mutex);
contextResize(fd * 1.5);
fd_ctx = m_fdContexts[fd];
}

// 一个句柄一般不会重复加同一个事件, 可能是两个不同的线程在操控同一个句柄添加事件
FdContext::MutexType::Lock lock2(fd_ctx -> mutex);
if (SYLAR_UNLIKELY(fd_ctx -> events & event)) {
SYLAR_LOG_ERROR(g_logger) << "addEvent assert fd=" << fd
<< " event=" << (EPOLL_EVENTS)event
<< " fd_ctx.event=" << (EPOLL_EVENTS)fd_ctx -> events;
SYLAR_ASSERT(!(fd_ctx -> events & event));
}

// 若已经有注册的事件则为修改操作,若没有则为添加操作
int op = fd_ctx -> events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
epoll_event epevent;
epevent.events = EPOLLET | fd_ctx -> events | event; // 边缘触发,保留原有事件,添加新事件
epevent.data.ptr = fd_ctx; // 将fd_ctx存到data的指针中

// 注册事件
int rt = epoll_ctl(m_epfd, op, fd, &epevent);
if (rt) {
SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", "
<< (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):"
<< rt << " (" << errno << ") (" << strerror(errno) << ") fd_ctx->events="
<< (EPOLL_EVENTS)fd_ctx->events;
return -1;
}

++m_pendingEventCount; // 增加待处理事件数量

fd_ctx -> events = (Event)(fd_ctx -> events | event); // 更新fd_ctx的事件
FdContext::EventContext& event_ctx = fd_ctx -> getEventContext(event); // 获取事件上下文
SYLAR_ASSERT(!event_ctx.scheduler && !event_ctx.fiber && !event_ctx.cb);// 断言事件上下文中的调度器,协程,回调函数都为空

event_ctx.scheduler = Scheduler::GetThis(); // 获得当前调度器
if (cb) event_ctx.cb.swap(cb); // 交换回调函数
else {
event_ctx.fiber = Fiber::GetThis(); // 获取当前协程
SYLAR_ASSERT2(event_ctx.fiber -> getState() == Fiber::RUNNING, "state=" << event_ctx.fiber -> getState());
}
return 0;

}

3.7 delEvent()

delEvent函数用于删除fd的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
bool IOManager::delEvent(int fd, Event event) {
// 找到fd对应的上下文 fdcontext
RWMutexType::ReadLock lock(m_mutex);
if ((int)m_fdContexts.size() <= fd) return false; // 如果fd对应的上下文不存在,那么直接返回false

FdContext* fd_ctx = m_fdContexts[fd];
lock.unlock();

FdContext::MutexType::Lock lock2(fd_ctx -> mutex);
if (SYLAR_UNLIKELY(!(fd_ctx -> events & event))) return false; // 如果若没有要删除的事件,直接返回false

// 清除指定的事件,表示不关心这个事件了,如果清除之后结果为0,则从epoll_wait中删除该文件描述符
Event new_events = (Event)(fd_ctx -> events & ~event); // 清除指定的事件
int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; // 如果还有事件,那么就是修改事件,否则就是删除事件
epoll_event epevent;
epevent.events = EPOLLET | new_events; // 水平触发模式,新的注册事件
epevent.data.ptr = fd_ctx; // 将fd_ctx存到data的指针中

int rt = epoll_ctl(m_epfd, op, fd, &epevent); // 注册事件
if (rt) {
SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", "
<< (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):"
<< rt << " (" << errno << ") (" << strerror(errno) << ")";
return false;
}

--m_pendingEventCount; // 减少待处理事件数量

fd_ctx -> events = new_events; // 更新事件
FdContext::EventContext& event_ctx = fd_ctx -> getEventContext(event); // 拿到对应事件的EventContext
fd_ctx -> resetEventContext(event_ctx); // 重置EventContext
return true;
}

3.8 cancelEvent()

cancelEvent函数用于取消fd的事件,取消事件表示不关心某个fd的某个事件了,如果某个fd的可读或可写事件都被取消了,那这个fd会从调度器的epoll_wait中删除。

  • 取消事件会触发该事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
bool IOManager::cancelEvent(int fd, Event event) {
// 找到fd对应的上下文
RWMutexType::ReadLock lock(m_mutex);
if ((int)m_fdContexts.size() <= fd) return false;

FdContext* fd_ctx = m_fdContexts[fd];
lock.unlock();

FdContext::MutexType::Lock lock2(fd_ctx -> mutex);
if (SYLAR_UNLIKELY(!(fd_ctx -> events & event))) return false;

// 清除指定的事件,表示不关心这个事件了
Event new_events = (Event)(fd_ctx -> events & ~event);
int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
epoll_event epevent;
epevent.events = EPOLLET | new_events;
epevent.data.ptr = fd_ctx;

int rt = epoll_ctl(m_epfd, op, fd, &epevent);
if (rt) {
SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", "
<< (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):"
<< rt << " (" << errno << ") (" << strerror(errno) << ")";
return false;
}

fd_ctx->triggerEvent(event); // 清除之前触发一次事件
--m_pendingEventCount; // 减少待处理事件数量
return true;
}

3.9 cancelAll()

cancelAll函数用于取消fd的所有事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
bool IOManager::cancelAll(int fd) {
// 找到fd对应的上下文
RWMutexType::ReadLock lock(m_mutex);
if ((int)m_fdContexts.size() <= fd) return false;

FdContext* fd_ctx = m_fdContexts[fd];
lock.unlock();

FdContext::MutexType::Lock lock2(fd_ctx -> mutex);
if (!fd_ctx -> events) return false;

// 清除所有事件
int op = EPOLL_CTL_DEL;
epoll_event epevent;
epevent.events = 0;
epevent.data.ptr = fd_ctx;

int rt = epoll_ctl(m_epfd, op, fd, &epevent);
if (rt) {
SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", "
<< (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):"
<< rt << " (" << errno << ") (" << strerror(errno) << ")";
return false;
}

// 触发全部已注册的事件
if (fd_ctx -> events & READ) {
fd_ctx -> triggerEvent(READ);
--m_pendingEventCount;
}
if (fd_ctx -> events & WRITE) {
fd_ctx -> triggerEvent(WRITE);
--m_pendingEventCount;
}

SYLAR_ASSERT(fd_ctx -> events == 0);
return true;
}

3.10 GetThis()

获得当前IO调度器:

1
2
3
IOManager* IOManager::GetThis() {
return dynamic_cast<IOManager*>(Scheduler::GetThis());
}

3.11 tickle()

通知调度协程、也就是Scheduler::run()idle中退出。IOManageridle协程每次从idle中退出之后,都会重新把任务队列里的所有任务执行完了再重新进入idle,如果没有调度线程处理于idle状态,那也就没必要发通知了。

1
2
3
4
5
6
7
void IOManager::tickle() {
SYLAR_LOG_DEBUG(g_logger) << "tickle";
if (!hasIdleThreads()) return;

int rt = write(m_tickleFds[1], "T", 1);
SYLAR_ASSERT(rt == 1);
}

3.12 idle()

重写Scheduleridle函数,调度器无调度任务时会阻塞idle协程上,对IO调度器而言,idle状态应该关注两件事,一是有没有新的调度任务,对应Schduler::schedule(),如果有新的调度任务,那应该立即退出idle状态,并执行对应的任务;二是关注当前注册的所有IO事件有没有触发,如果有触发,那么应该执行IO事件对应的回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
void IOManager::idle() {
SYLAR_LOG_DEBUG(g_logger) << "idle";

// 一次epoll_wait最多检测256个就绪事件,如果就绪事件超过了这个数,那么会在下轮epoll_wati继续处理
const uint64_t MAX_EVENTS = 256;
epoll_event* events = new epoll_event[MAX_EVENTS]();
// 创建shared_ptr时,包括原始指针和自定义删除器,这样在shared_ptr析构时会调用自定义删除器,释放原始指针
std::shared_ptr<epoll_event> shared_events(events, [](epoll_event* ptr) { delete[] ptr; });

while(true) {
// 获取下一个定时器的超时时间,顺便判断调度器是否停止
uint64_t next_timeout = 0;
if(SYLAR_UNLIKELY(stopping(next_timeout))) {
SYLAR_LOG_DEBUG(g_logger) << "name=" << getName() << " idle stopping exit";
break;
}
// 阻塞在epoll_wait上,等待事件发生, 如果有定时器,那么就等到定时器超时时间
int rt = 0;
do {
// 默认超时时间5秒,如果下一个定时器的超时时间大于5秒,仍以5秒来计算超时,避免定时器超时时间太大时,epoll_wait一直阻塞
static const int MAX_TIMEOUT = 5000;
if(next_timeout != ~0ull) next_timeout = std::min((int)next_timeout, MAX_TIMEOUT);
else next_timeout = MAX_TIMEOUT;

rt = epoll_wait(m_epfd, events, MAX_EVENTS, (int)next_timeout);// 等待事件发生,返回发生的事件数量,-1表示出错,0表示超时
if(rt < 0 && errno == EINTR) continue; // 如果是中断,那么就继续等待
else break; // 否则,退出循环
} while(true);

std::vector<std::function<void()>> cbs;
listExpiredCb(cbs); // 获取所有已经超时的定时器的回调函数
if(!cbs.empty()) {
for(const auto& cb : cbs) {
schedule(cb); // 将所有已经超时的定时器的回调函数加入调度器
}
cbs.clear(); // 回调函数运行完了,清空
}

// 遍历所有发生的事件,根据epoll_event的私有指针找到对应的FdContext,进行事件处理
for(int i = 0; i < rt; ++i) {
epoll_event& event = events[i];
// 如果是管道读端的事件,那么就读取管道中的数据
if(event.data.fd == m_tickleFds[0]) {
uint8_t dummy[256];
while(read(m_tickleFds[0], dummy, sizeof(dummy)) > 0)
; // 读取管道中的数据,直到读完(read返回的字节数为0)
continue;
}

FdContext* fd_ctx = (FdContext*)event.data.ptr; // 获取fd对应的上下文
FdContext::MutexType::Lock lock(fd_ctx -> mutex); // 加锁

/**
* EPOLLERR: 出错,比如写读端已经关闭的pipe
* EPOLLHUP: 套接字对端关闭
* 出现这两种事件,应该同时触发fd的读和写事件,否则有可能出现注册的事件永远执行不到的情况
*/
if(event.events & (EPOLLERR | EPOLLHUP)) { // 如果是错误事件或者挂起事件,那么就触发读写事件
event.events |= (EPOLLIN | EPOLLOUT) & fd_ctx -> events;
}

int real_events = NONE; // 实际发生的事件
if(event.events & EPOLLIN) real_events |= READ; // 如果是读事件,那么就设置实际发生的事件为读事件
if(event.events & EPOLLOUT) real_events |= WRITE; // 如果是写事件,那么就设置实际发生的事件为写事件
if((fd_ctx -> events & real_events) == NONE) continue; // 如果实际发生的事件和注册的事件没有交集,那么就继续处理下一个事件

// 剔除已经发生的事件,将剩下的事件重新加入epoll_wait
int left_events = (fd_ctx -> events & ~real_events); // 计算剩余的事件
int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; // 如果还有事件,那么就是修改事件,否则就是删除事件
event.events = EPOLLET | left_events; // 更新事件

int rt2 = epoll_ctl(m_epfd, op, fd_ctx -> fd, &event); // 对文件描述符 `fd_ctx -> fd` 执行操作 `op`,并将结果存储在 `rt2` 中。
if(rt2) {
SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", "
<< (EpollCtlOp)op << ", " << fd_ctx -> fd << ", " << (EPOLL_EVENTS)event.events << "):"
<< rt2 << " (" << errno << ") (" << strerror(errno) << ")";
continue;
}

if(real_events & READ) {
fd_ctx -> triggerEvent(READ); // 触发读事件
--m_pendingEventCount; // 减少待处理事件数量
}
if(real_events & WRITE) {
fd_ctx -> triggerEvent(WRITE); // 触发写事件
--m_pendingEventCount; // 减少待处理事件数量
}
}

/**
* 一旦处理完所有的事件,idle协程yield,这样可以让调度协程(Scheduler::run)重新检查是否有新任务要调度
* 上面triggerEvent实际也只是把对应的fiber重新加入调度,要执行的话还要等idle协程退出
*/
Fiber::ptr cur = Fiber::GetThis(); // 获取当前协程
auto raw_ptr = cur.get(); // 获取当前协程的原始指针
cur.reset(); // 重置当前协程
raw_ptr -> yield(); // 让出当前协程的执行权
}
}

3.13 stopping()

重写Schedulerstopping函数,判断是否可以停止,同时获取最近一个定时器的超时时间,如果有定时器,那么就等到定时器超时时间。

1
2
3
4
5
6
7
8
9
10
11
12
bool IOManager::stopping() {
uint64_t timeout = 0;
return stopping(timeout);
}

bool IOManager::stopping(uint64_t& timeout) {
// 对于IOManager而言,必须等所有待调度的IO事件都执行完了才可以退出
// 增加定时器功能后,还应该保证没有剩余的定时器待触发
timeout = getNextTimer();
// 如果没有定时器,没有待处理事件,调度器也在停止,那么就可以停止了
return timeout == ~0ull && m_pendingEventCount == 0 && Scheduler::stopping();
}

4 总结

  1. 总的来说,sylarIO协程调度模块可分为两部分,第一部分是对协程调度器的改造,将epoll与协程调度融合,重新实现tickleidle,并保证原有的功能不变。第二部分是基于epoll实现IO事件的添加、删除、调度、取消等功能。
  2. IO协程调度关注的是FdContext信息,也就是描述符-事件-回调函数三元组,IOManager需要保存所有关注的三元组,并且在epoll_wait检测到描述符事件就绪时执行对应的回调函数。
  3. epoll是线程安全的,即使调度器有多个调度线程,它们也可以共用同一个epoll实例,而不用担心互斥。由于空闲时所有线程都阻塞的epoll_wait上,所以也不用担心CPU占用问题。
  4. addEvent是一次性的,比如说,注册了一个读事件,当fd可读时会触发该事件,但触发完之后,这次注册的事件就失效了,后面fd再次可读时,并不会继续执行该事件回调,如果要持续触发事件的回调,那每次事件处理完都要手动再addEvent。这样在应对fdWRITE事件时会比较好处理,因为fd可写是常态,如果注册一次就一直有效,那么可写事件就必须在执行完之后就删除掉。
  5. cancelEventcancelAll都会触发一次事件,但delEvent不会。
  6. FdContext的寻址问题,sylar直接使用fd的值作为FdContext数组的下标,这样可以快速找到一个fd对应的FdContext。由于关闭的fd会被重复利用,所以这里也不用担心FdContext数组膨胀太快,或是利用率低的问题。
  7. IO协程调度器的退出,不但所有协程要完成调度,所有IO事件也要完成调度。
  8. sylarIO协程调度器应该配合非阻塞IO来使用,如果使用阻塞模式,可能会阻塞进程,参考为什么 IO 多路复用要搭配非阻塞 IO?