C++服务器框架:协程库——I/O协程调度
继承自协程调度器,封装了
epoll
,支持为socket fd
注册读写事件回调函数。
1 I/O
协程调度概述
I/O
事件调度功能对服务器开发⾄关重要,因为服务器通常需要处理⼤量来⾃客户端的socket fd
,使⽤I/O
事件调度可以将开发者从判断socket fd
是否可读或可写的⼯作中解放出来,使得程序员只需要关⼼socket fd
的I/O
操作,实现I/O
协程调度意义重⼤。
I/O
协程调度可以看成是增强版的协程调度。在前⾯的协程调度模块中,调度器对协程的调度是⽆条件执⾏的,在调度器已经启动调度的情况下,任务⼀旦添加成功,就会排队等待调度器执⾏。调度器不⽀持删除调度任务,并且调度器在正常退出之前⼀定会执⾏完全部的调度任务,所以在某种程度上可以认为,把⼀个协程添加到调度器的任务队列,就相当于调⽤了协程的resume
⽅法。
I/O
协程调度支持协程调度的全部功能,因为I/O
协程调度器是直接继承协程调度器实现的。除了协程调度,I/O
协程调度还增加了I/O
事件调度的功能,这个功能是针对描述符(一般是套接字描述符)的。I/O
协程调度支持为描述符注册可读和可写事件的回调函数,当描述符可读或可写时,执行对应的回调函数。(这里可以直接把回调函数等效成协程,所以这个功能被称为I/O
协程调度)。
很多的库都可以实现类似的工作,比如libevent
,libuv
,libev
等,这些库被称为异步事件库或异步I/O
库。有的库不仅可以处理socket fd
事件,还可以处理定时器事件和信号事件。这些事件库的实现原理基本类似,都是先将套接字设置成非阻塞状态,然后将套接字与回调函数绑定,接下来进入一个基于I/O
多路复用的事件循环,等待事件发生,然后调用对应的回调函数。
sylar
在协程调度模块的基础上,封装了epoll
。支持对I/O
事件的调度功能,可以为socket
句柄添加读事件(EPOLLIN
)和写事件(EPOLLOUT
),并且支持删除事件功能。I/OManager
主要通过FdContext
结构体存储文件描述符fd
, 注册的事件event
,执行任务cb/fiber
,其中fd
和event
用于epoll_wait
,cb/fiber
用于执行任务。当有任务时,使用管道pipe
来唤醒epoll_wait
先执行其他任务。
1.2 相关概念
在I/O
操作时,操作系统通常会将数据缓存在文件系统的页缓存(page cache
)中。这意味着数据的传输过程分为两个阶段:
- 第一阶段:数据首先被拷贝到操作系统内核的缓冲区中。
- 第二阶段:数据从操作系统内核的缓冲区拷贝到应用程序的地址空间。
1.2.1 阻塞I/O
在默认情况下,所有的socket
都是被阻塞的,也就是阻塞I/O
,这样会导致两个阶段的阻塞: - 进程可能需要等待数据到达。 - 数据从内核拷贝到用户空间时,进程也可能被阻塞。
1.2.2 非阻塞I/O
通过设置,可以将socket
变为非阻塞模式(non-blocking
),在非阻塞模式下:
- 如果内核还没有准备好数据,那么它不会阻塞用户进程,而是立即返回一个错误。
- 当内核准备好数据并且再次收到用户进程的
system call
时,数据会被拷贝到用户内存。然而,这个拷贝过程(第二阶段)仍然可能导致进程被阻塞。
1.2.3 异步I/O
异步I/O
(Asynchronous I/O
)允许在两个阶段都不会阻塞用户进程:
- 用户进程发起read操作后,内核收到system call会立即返回,不会阻塞用户进程。
- 当内核准备好数据并完成拷贝到用户空间后,内核会通过某种机制(如信号或回调函数)通知用户进程,整个过程中用户进程不会被阻塞。
异步I/O
是真正的异步操作,它在两个阶段都不会阻塞用户进程。阻塞I/O
在两个阶段都可能阻塞用户进程。非阻塞I/O
在第一阶段不阻塞用户进程,但在第二阶段拷贝数据时可能会阻塞。I/O
多路复用是同步I/O
的一种形式,它允许进程同时监视多个I/O
操作,但在I/O
操作就绪时,进程仍然需要被阻塞以处理这些操作。
2 I/O
多路复用
服务器需要与多个客户端建立连接时,会涉及处理大量的socket
文件描述符。为了有效管理这些文件描述符并进行I/O
操作,可以利用I/O多路复用技术。
当用户进程调用select
函数时,整个进程会被阻塞。同时,内核会监视所有由select
负责的socket
文件描述符。一旦任何一个socket
中的数据准备好了,select
就会返回,此时用户进程可以调用recvfrom
函数来接收数据。内核收到系统调用后将数据拷贝到用户进程中。
在Linux中,主要有三种常用的I/O
多路复用方式:select
、poll
和epoll
。通常情况下,将socket
设置为非阻塞模式(O_NONBLOCK
),这样在进行I/O
操作时,用户进程虽然仍然会被阻塞,但是是被select
函数阻塞,而不是被socket I/O
阻塞。
- 当
select
返回了,那一定是socket
中的数据准备好了,recvfrom
也不会阻塞了,所以设不设置socket
为非阻塞模式似乎没什么区别?
将socket
设置为非阻塞模式(使用O_NONBLOCK标志
)可以确保即使select
报告socket
可读,recvfrom
调用也不会阻塞。这是因为在非阻塞模式下,如果数据没有准备好,recvfrom
会立即返回一个错误,而不是等待数据。此外,在Linux
下,select
可能会将socket
报告为“准备读取”,即使实际上并没有数据可读。这可能是由于多种原因,包括但不限于网络条件、socket
状态的变化,或者是内核内部的实现细节。因此,即使select
返回,也不能保证recvfrom
调用一定会成功。最后,在不应阻塞的socket上使用非阻塞模式(O_NONBLOCK)可以避免潜在的阻塞,确保应用程序能够继续响应其他事件或进行其他操作。
2.1 select
1 | /* |
当进程调用select
时将会被阻塞,fd_set
的数据结构为bitmap
,通过FD_SET
方法将需要监听的文件描述符集合fdset
对应的bitmap
置为1(例如文件描述符集合为4
,9
,那么就将bitmap
的第4
位和第9
位置为1
),select
会截取bitmap
前n
位进行监听。select
会将需要关注的fd_set
拷贝到内核态监听,当有数据来时,内核将有数据的fd_set
置位(bitmap
对应的文件描述符置位为相应的操作,读、写、异常),select
返回。因为不知道是哪个文件描述符来数据了,所以再遍历fdset
寻找就绪的文件描述符。
select
目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。
select
的缺点: 1. 单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024
,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。 2. fd_set
是不可重用的,每次需要使用FD_ZERO
方法清空。 3. 每次调用select
都需要将fd_set
拷贝到内核态,有开销。 4. 不知道是哪个文件描述符的数据来了,所以要遍历,时间复杂度为O(n)
。
2.2 poll
1 | /* |
poll
数据结构与select
不同,poll
采用数组存储pollfd
,并将fd
和关注的事件(POLLIN
等)分别保存到pollfd
的fd
和events
中。
poll
与select
工作原理相同,但要注意的是,当数据来时,poll
将revents
置位(POLLIN
等),然后poll
函数返回。仍然要遍历数组来看是哪个文件描述符来了,并且将revents
置为0
,这样就能重复使用pollfd
。
poll
优点:
- 解决了
select
的1024
上限。 - 解决了
select
fd_set
不可重用,pollfd
可以通过重置revents
恢复如初。
poll
缺点:
- 每次调用
poll
都需要将pollfd
拷贝到内核态,有开销。 - 不知道是哪个文件描述符的数据来了,所以要遍历,时间复杂度为
O(n)
。
2.3 epoll
epoll
是在2.6
内核中提出的,是之前的select
和poll
的增强版本。相对于select
和poll
来说,epoll
更加灵活,没有描述符限制。epoll
使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy
只需一次。
epoll
通过以下3
个接口操作:
更多详细内容可以查看深度Linux - epoll性能那么高,为什么?
2.3.1 epoll_create
1 | //创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大 |
创建一个epoll
的句柄,size
用来告诉内核这个监听的数目一共有多大,这个参数不同于select()
中的第一个参数,给出最大监听的fd+1
的值,参数size
并不是限制了epoll
所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。但看了源码,只要size
大于0
就可以了,没有实质性的作用。
当创建好epoll
句柄后,它就会占用一个fd
值,在Linux
下如果查看/proc/进程id/fd/
,是能够看到这个fd
的,所以在使用完epoll
后,必须调用close()
关闭,否则可能导致fd
被耗尽。
通过源码得知,每创建一个epollfd
,内核就会分配一个eventpoll
结构体与之对应,其中维护了一个RBTree
来存放所有要监听的struct epitem
(表示一个被监听的fd
)。
2.3.2 epoll_ctl
从用户空间将epoll_event
结构copy
到内核空间
1 | /* |
events
可以是以下几个宏的集合: - EPOLLIN
:表示对应的文件描述符可以读(包括对端SOCKET
正常关闭); - EPOLLOUT
:表示对应的文件描述符可以写; - EPOLLPRI
:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来); - EPOLLERR
:表示对应的文件描述符发生错误; - EPOLLHUP
:表示对应的文件描述符被挂断; - EPOLLET
: 将EPOLL
设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。 - EPOLLONESHOT
:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket
的话,需要再次把这个socket
加入到EPOLL
队列里。
通过源码得知,同一个fd
不能重复添加。内核会自动添加这两个事件epds.events |= POLLERR | POLLHUP
;并且使用copy_from_user
从用户空间将epoll_event
结构copy
到内核空间。
1 | if (ep_op_has_event(op) && |
2.3.3 epoll_wait
1 | /* |
收集在epoll
监控的事件中已经发生的事件,如果epoll
中没有任何一个事件发生,则最多等待timeout
毫秒后返回。epoll_wait
的返回值表示当前发生的事件个数,如果返回0
,则表示本次调用中没有事件发生,如果返回-1
,则表示发生错误,需要检查errno
判断错误类型。
通过源码得知,通过__put_user
将数据从内核空间拷贝到用户空间。
1 | if (__put_user(revents, &uevent->events) || |
2.3.4 epoll
工作模式
epoll
有两种工作模式,LT
(水平触发)模式与ET
(边缘触发)模式。默认情况下,epoll
采用LT
模式工作。两个的区别是:
Level_triggered(水平触发)
:当被监控的文件描述符上有可读写事件发生时,epoll_wait
会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用epoll_wait
时,它还会通知在没读写完的文件描述符上继续读写,当然如果一直不去读写,会一直通知。如果系统中有大量不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率。Edge_triggered(边缘触发)
:当被监控的文件描述符上有可读写事件发生时,epoll_wait
会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait
时,它不会通知,也就是它只会通知一次,直到该文件描述符上出现第二次可读写事件才会再次通知。这种模式比水平触发效率高,系统不会充斥大量用户不关心的就绪文件描述符。
在LT
模式下开发基于epoll
的应用要简单一些,不太容易出错,而在ET
模式下事件发生时,如果没有彻底地将缓冲区的数据处理完,则会导致缓冲区的用户请求得不到响应。注意,默认情况下Nginx
采用ET
模式使用epoll
的。
2.3.5 epoll
优点
监视的描述符数量不受限制:它所支持的
fd
上限是最大可以打开文件的数目,这个数字一般远大于2048
,举个例子,在1GB
内存的机器上大约是10
万左右,具体数目可以cat/proc/sys/fs/file-max
察看,一般来说这个数目和系统内存关系很大。IO
的效率不会随着监视fd
的数量的增长而下降:epoll
不同于select
和poll
轮询的方式,而是通过每个fd
定义的回调函数来实现的。只有就绪的fd
才会执行回调函数ep_poll_callback()
。ep_poll_callback()
的调用时机是由被监听的fd
的具体实现,比如socket
或者某个设备驱动来决定的,因为等待队列头是他们持有的,epoll
和当前进程只是单纯的等待。
epoll
使用一个文件描述符管理多个描述符:将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy
只需一次。
2.4 epoll
与select
、poll
的比较
select |
poll |
epoll |
|
---|---|---|---|
数据结构 | bitmap |
数组 | 红黑树+链表 |
最大连接数 | 1024 |
无上限 | 无上限 |
fd 拷贝 |
每次调用select拷贝 | 每次调用poll 拷贝 |
首次调用epoll_ctl 拷贝,每次调用epoll_wait 不拷贝 |
工作效率 | 轮询:O(n) |
轮询:O(n) |
回调:O(1) |
3 I/OManager
实现
言归正传,sylar
的I/O
协程调度模块基于epoll
实现,只支持Linux
平台。对每个fd
,sylar
支持两类事件,一类是可读事件,对应EPOLLIN
,一类是可写事件,对应EPOLLOUT
,sylar
的事件枚举值直接继承自epoll
。当然epoll
本身除了支持了EPOLLIN
和EPOLLOUT
两类事件外,还支持其他事件,比如EPOLLRDHUP
, EPOLLERR
, EPOLLHUP
等,对于这些事件,sylar
的做法是将其进行归类,分别对应到EPOLLIN
和EPOLLOUT
中,也就是所有的事件都可以表示为可读或可写事件,甚至有的事件还可以同时表示可读及可写事件,比如EPOLLERR
事件发生时,fd
将同时触发可读和可写事件。
对于IO
协程调度来说,每次调度都包含一个三元组信息,分别是描述符-事件类型(可读或可写)-回调函数,调度器记录全部需要调度的三元组信息,其中描述符和事件类型用于epoll_wait
,回调函数用于协程调度。这个三元组信息在源码上通过FdContext
结构体来存储,在执行epoll_wait
时通过epoll_event
的私有数据指针data.ptr
来保存FdContext
结构体信息。IO
协程调度器在idle
时会epoll_wait
所有注册的fd
,如果有fd
满足条件,epoll_wait
返回,从私有数据中拿到fd
的上下文信息,并且执行其中的回调函数。(实际是idle
协程只负责收集所有已触发的fd
的回调函数并将其加入调度器的任务队列,真正的执行时机是idle
协程退出后,调度器在下一轮调度时执行)。与协程调度器不一样的是,IO
协程调度器支持取消事件。取消事件表示不关心某个fd
的某个事件了,如果某个fd
的可读或可写事件都被取消了,那这个fd
会从调度器的epoll_wait
中删除。
3.1 IOManager
类
sylar
的IO
协程调度器对应IOManager
,这个类直接继承自Scheduler
:
1 | class IOManager : public Scheduler { |
首先是读写事件的定义,这里直接继承epoll
的枚举值,如下: 1
2
3
4
5
6
7/// @brief IO事件,继承自epoll_event对事件的定义
/// @details 这里只关心socket fd的读和写事件,其他epoll事件会归类到这两类事件中
enum Event {
NONE = 0x0, // 无事件
READ = 0x1, // 读事件(EPOLLIN)
WRITE = 0x4 // 写事件(EPOLLOUT)
};
3.2 FdContext
结构体
接下来是对描述符-事件类型-回调函数三元组的定义,这个三元组也称为fd上下文,使用结构体FdContext
来表示。由于fd
有可读和可写两种事件,每种事件的回调函数也可以不一样,所以每个fd
都需要保存两个事件类型-回调函数组合。FdContext
结构体定义如下:
1 | /// @brief Socket事件上下文 |
3.3 成员变量
IOManager
包含一个epoll
实例的句柄m_epfd
以及用于tickle
的一对pipe fd
,还有全部的fd
上下文m_fdContexts
,如下:
1 | int m_epfd = 0; /// epoll文件句柄 |
3.4 IOManager()
IOManager
类的构造函数,接收三个参数,分别是线程数量,是否使用调度线程,以及调度器的名字。在构造函数中,首先创建epoll
句柄,然后创建一个pipe
,并将读端添加到epoll
中,这个pipe
的作用是用于唤醒epoll_wait
,因为epoll_wait
是阻塞的,如果没有事件发生,epoll_wait
会一直阻塞,所以需要一个pipe
来唤醒epoll_wait
。最后调用contextResize
函数初始化fd
上下文数组,然后启动调度器。
1 | IOManager::IOManager(size_t threads, bool use_caller, const std::string& name) |
3.5 ~IOManager()
IOManager
类的析构函数,关闭epoll
句柄和pipe
,并且释放fd
上下文数组。
1 | IOManager::~IOManager() { |
3.6 addEvent()
addEvent
函数用于添加fd
的事件,fd
的事件类型,以及事件的回调函数。首先通过RWMutex
加写锁,然后通过fd
获取fd
上下文,如果fd
上下文不存在,则创建一个新的fd
上下文,然后根据事件类型设置事件上下文的调度器和回调函数,最后将fd
上下文的事件类型添加到fd
上下文的事件中。
1 | int IOManager::addEvent(int fd, Event event, std::function<void()> cb) { |
3.7 delEvent()
delEvent
函数用于删除fd
的事件。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33bool IOManager::delEvent(int fd, Event event) {
// 找到fd对应的上下文 fdcontext
RWMutexType::ReadLock lock(m_mutex);
if ((int)m_fdContexts.size() <= fd) return false; // 如果fd对应的上下文不存在,那么直接返回false
FdContext* fd_ctx = m_fdContexts[fd];
lock.unlock();
FdContext::MutexType::Lock lock2(fd_ctx -> mutex);
if (SYLAR_UNLIKELY(!(fd_ctx -> events & event))) return false; // 如果若没有要删除的事件,直接返回false
// 清除指定的事件,表示不关心这个事件了,如果清除之后结果为0,则从epoll_wait中删除该文件描述符
Event new_events = (Event)(fd_ctx -> events & ~event); // 清除指定的事件
int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; // 如果还有事件,那么就是修改事件,否则就是删除事件
epoll_event epevent;
epevent.events = EPOLLET | new_events; // 水平触发模式,新的注册事件
epevent.data.ptr = fd_ctx; // 将fd_ctx存到data的指针中
int rt = epoll_ctl(m_epfd, op, fd, &epevent); // 注册事件
if (rt) {
SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", "
<< (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):"
<< rt << " (" << errno << ") (" << strerror(errno) << ")";
return false;
}
--m_pendingEventCount; // 减少待处理事件数量
fd_ctx -> events = new_events; // 更新事件
FdContext::EventContext& event_ctx = fd_ctx -> getEventContext(event); // 拿到对应事件的EventContext
fd_ctx -> resetEventContext(event_ctx); // 重置EventContext
return true;
}
3.8 cancelEvent()
cancelEvent
函数用于取消fd
的事件,取消事件表示不关心某个fd
的某个事件了,如果某个fd
的可读或可写事件都被取消了,那这个fd
会从调度器的epoll_wait
中删除。
- 取消事件会触发该事件。
1 | bool IOManager::cancelEvent(int fd, Event event) { |
3.9 cancelAll()
cancelAll
函数用于取消fd
的所有事件。
1 | bool IOManager::cancelAll(int fd) { |
3.10 GetThis()
获得当前IO
调度器:
1 | IOManager* IOManager::GetThis() { |
3.11 tickle()
通知调度协程、也就是Scheduler::run()
从idle
中退出。IOManager
的idle
协程每次从idle
中退出之后,都会重新把任务队列里的所有任务执行完了再重新进入idle
,如果没有调度线程处理于idle
状态,那也就没必要发通知了。
1 | void IOManager::tickle() { |
3.12 idle()
重写Scheduler
的idle
函数,调度器无调度任务时会阻塞idle
协程上,对IO
调度器而言,idle
状态应该关注两件事,一是有没有新的调度任务,对应Schduler::schedule()
,如果有新的调度任务,那应该立即退出idle
状态,并执行对应的任务;二是关注当前注册的所有IO
事件有没有触发,如果有触发,那么应该执行IO
事件对应的回调函数。
1 | void IOManager::idle() { |
3.13 stopping()
重写Scheduler
的stopping
函数,判断是否可以停止,同时获取最近一个定时器的超时时间,如果有定时器,那么就等到定时器超时时间。
1 | bool IOManager::stopping() { |
4 总结
- 总的来说,
sylar
的IO
协程调度模块可分为两部分,第一部分是对协程调度器的改造,将epoll
与协程调度融合,重新实现tickle
和idle
,并保证原有的功能不变。第二部分是基于epoll
实现IO
事件的添加、删除、调度、取消等功能。 IO
协程调度关注的是FdContext
信息,也就是描述符-事件-回调函数三元组,IOManager
需要保存所有关注的三元组,并且在epoll_wait
检测到描述符事件就绪时执行对应的回调函数。epoll
是线程安全的,即使调度器有多个调度线程,它们也可以共用同一个epoll
实例,而不用担心互斥。由于空闲时所有线程都阻塞的epoll_wait
上,所以也不用担心CPU
占用问题。addEvent
是一次性的,比如说,注册了一个读事件,当fd
可读时会触发该事件,但触发完之后,这次注册的事件就失效了,后面fd
再次可读时,并不会继续执行该事件回调,如果要持续触发事件的回调,那每次事件处理完都要手动再addEvent
。这样在应对fd
的WRITE
事件时会比较好处理,因为fd
可写是常态,如果注册一次就一直有效,那么可写事件就必须在执行完之后就删除掉。cancelEvent
和cancelAll
都会触发一次事件,但delEvent
不会。FdContext
的寻址问题,sylar
直接使用fd
的值作为FdContext
数组的下标,这样可以快速找到一个fd
对应的FdContext
。由于关闭的fd
会被重复利用,所以这里也不用担心FdContext
数组膨胀太快,或是利用率低的问题。IO
协程调度器的退出,不但所有协程要完成调度,所有IO
事件也要完成调度。sylar
的IO
协程调度器应该配合非阻塞IO
来使用,如果使用阻塞模式,可能会阻塞进程,参考为什么 IO 多路复用要搭配非阻塞 IO?