C++服务器框架:协程库——hook模块

hook模块封装了一些C标准库提供的APIsocket IO相关的API。能够使同步API实现异步的性能。

1. hook概述

1.1 什么是hook

  hook实际上就是对系统调用API进行一次封装,将其封装成一个与原始的系统调用API同名的接口,应用在调用这个接口时,会先执行封装中的操作,再执行原始的系统调用APIhook技术可以使应用程序在执行系统调用之前进行一些隐藏的操作,比如可以对系统提供malloc()free()进行hook,在真正进行内存分配和释放之前,统计内存的引用计数,以排查内存泄露问题。

  还可以用C++的子类重载来理解hook。在C++中,子类在重载父类的同名方法时,一种常见的实现方式是子类先完成自己的操作,再调用父类的操作,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

class Base {
public:
void Print() {
cout << "This is Base" << endl;
}
};

class Child : public Base {
public:
/// 子类重载时先实现自己的操作,再调用父类的操作
void Print() {
cout << "This is Child" << endl;
Base::Print();
}
};

  在上面的代码实现中,调用子类的Print方法,会先执行子类的语句,然后再调用父类的Print方法,这就相当于子类hook了父类的Print方法。由于hook之后的系统调用与原始的系统系统调用同名,所以对于程序开发者来说也很方便,不需要重新学习新的接口,只需要按老的接口调用惯例直接写代码就行了。

1.2 hook的功能

  hook的目的是在不重新编写代码的情况下,把老代码中的socket IO相关的API都转成异步,以提高性能。hookIO协程调度是密切相关的,如果不使用IO协程调度器,那hook没有任何意义,考虑IOManager要在一个线程上按顺序调度以下协程: 1. 协程1sleep(2) 睡眠两秒后返回。 2. 协程2:在scoket fd1send 100k数据。 3. 协程3:在socket fd2recv直到数据接收成功。

  在未hook的情况下,IOManager要调度上面的协程,流程是下面这样的: 1. 调度协程1,协程阻塞在sleep上,等2秒后返回,这两秒内调度线程是被协程1占用的,其他协程无法在当前线程上调度。 2. 调度协程2,协程阻塞send 100k数据上,这个操作一般问题不大,因为send数据无论如何都要占用时间,但如果fd迟迟不可写,那send会阻塞直到套接字可写,同样,在阻塞期间,其他协程也无法在当前线程上调度。 3. 调度协程3,协程阻塞在recv上,这个操作要直到recv超时或是有数据时才返回,期间调度器也无法调度其他协程。

  上面的调度流程最终总结起来就是,协程只能按顺序调度,一旦有一个协程阻塞住了,那整个调度线程也就阻塞住了,其他的协程都无法在当前线程上执行。像这种一条路走到黑的方式其实并不是完全不可避免,以sleep为例,调度器完全可以在检测到协程sleep后,将协程yield以让出执行权,同时设置一个定时器,2秒后再将协程重新resume。这样,调度器就可以在这2秒期间调度其他的任务,同时还可以顺利的实现sleep 2秒后再继续执行协程的效果,send/recv与此类似。在完全实现hook后,IOManager的执行流程将变成下面的方式:

  1. 调度协程1,检测到协程sleep,那么先添加一个2秒的定时器,定时器回调函数是在调度器上继续调度本协程,接着协程yield,等定时器超时。
  2. 因为上一步协程1已经yield了,所以协徎2并不需要等2秒后才可以执行,而是立刻可以执行。同样,调度器检测到协程send,由于不知道fd是不是马上可写,所以先在IOManager上给fd注册一个写事件,回调函数是让当前协程resume并执行实际的send操作,然后当前协程yield,等可写事件发生。
  3. 上一步协徎2也yield了,可以马上调度协程3。协程3与协程2类似,也是给fd注册一个读事件,回调函数是让当前协程resume并继续recv,然后本协程yield,等事件发生。
  4. 等2秒超时后,执行定时器回调函数,将协程1 resume以便继续执行。
  5. 等协程2fd可写,一旦可写,调用写事件回调函数将协程2 resume以便继续执行send
  6. 等协程3fd可读,一旦可读,调用回调函数将协程3 resume以便继续执行recv

  上面的456步都是异步的,调度线程并不会阻塞,IOManager仍然可以调度其他的任务,只在相关的事件发生后,再继续执行对应的任务即可。并且,由于hook的函数签名与原函数一样,所以对调用方也很方便,只需要以同步的方式编写代码,实现的效果却是异步执行的,效率很高。

  总而言之,在IO协程调度中对相关的系统调用进行hook,可以让调度线程尽可能得把时间片都花在有意义的操作上,而不是浪费在阻塞等待中。

  hook的重点是在替换API的底层实现的同时完全模拟其原本的行为,因为调用方是不知道hook的细节的,在调用被hookAPI时,如果其行为与原本的行为不一致,就会给调用方造成困惑。比如,所有的socket fd在进行IO调度时都会被设置成NONBLOCK模式,如果用户未显式地对fd设置NONBLOCK,那就要处理好fcntl,不要对用户暴露fd已经是NONBLOCK的事实,这点也说明,除了IO相关的函数要进行hook外,对fcntl, setsockopt之类的功能函数也要进行hook,才能保证API的一致性。

1.3 hook实现基础

  hook的实现机制非常简单,就是通过动态库的全局符号介入功能,用自定义的接口来替换掉同名的系统调用接口。由于系统调用接口基本上是由C标准函数库libc提供的,所以这里要做的事情就是用自定义的动态库来覆盖掉libc中的同名符号。

  由于动态库的全局符号介入问题,全局符号表只会记录第一次识别到的符号,后续的同名符号都被忽略,但这并不表示同名符号所在的动态库完全不会加载,因为有可能其他的符号会用到。以libc库举例,如果用户在链接libc库之前链接了一个指定的库,并且在这个库里实现了read/write接口,那么在程序运行时,程序调用的read/write接口就是指定库里的,而不是libc库里的。libc库仍然会被加载,因为libc库是程序的运行时库,程序不可能不依赖libc里的其他接口。因为libc库也被加载了,所以,通过一定的手段,仍然可以从libc中拿到属于libcread/write接口,这就为hook创建了条件。程序可以定义自己的read/write接口,在接口内部先实现一些相关的操作,然后再调用libc里的read/write接口。而将libc库中的接口重新找回来的方法就是使用dlsym()

1
2
3
4
5
6
#include <dlfcn.h>

/*
* 第一个参数固定为 RTLD_NEXT,第二个参数为符号的名称
*/
void *dlsym(void *handle, const char *symbol);

1.4 hook模块设计

  sylarhook功能是以线程为单位的,可以自由设置当前线程是否使用hook。默认情况下,协程调度器的调度线程会开启hook,而其他线程则不会开启。对以下和函数进行了hook,并且只对socket fd进行了hook,如果操作的不是socket fd,那会直接调用系统原本的API,而不是hook之后的API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
sleep
usleep
nanosleep
socket
connect
accept
read
readv
recv
recvfrom
recvmsg
write
writev
send
sendto
sendmsg
close
fcntl
ioctl
getsockopt
setsockopt
  除此外,sylar还增加了一个connect_with_timeout接口用于实现带超时的connect。为了管理所有的socket fdsylar设计了一个FdManager类来记录所有分配过的fd的上下文,这是一个单例类,每个socket fd上下文记录了当前fd的读写超时,是否设置非阻塞等信息。

  关于hook模块和IO协程调度的整合。一共有三类接口需要hook,如下:

  1. sleep延时系列接口,包括sleep/usleep/nanosleep。对于这些接口的hook,只需要给IO协程调度器注册一个定时事件,在定时事件触发后再继续执行当前协程即可。当前协程在注册完定时事件后即可yield让出执行权。
  2. socket IO系列接口,包括read/write/recv/send...等,connectaccept也可以归到这类接口中。这类接口的hook首先需要判断操作的fd是否是socket fd,以及用户是否显式地对该fd设置过非阻塞模式,如果不是socket fd或是用户显式设置过非阻塞模式,那么就不需要hook了,直接调用操作系统的IO接口即可。如果需要hook,那么首先在IO协程调度器上注册对应的读写事件,等事件发生后再继续执行当前协程。当前协程在注册完IO事件即可yield让出执行权。
  3. socket/fcntl/ioctl/close等接口,这类接口主要处理的是边缘情况,比如分配fd上下文,处理超时及用户显式设置非阻塞问题。

2. 模块实现

2.1 FdCtx

  FdCtx存储每一个fd相关的信息,并由FdManager管理每一个FdCtxFdManager为单例类。

  成员变量如下:

1
2
3
4
5
6
7
8
bool m_isInit: 1; /// 是否初始化
bool m_isSocket: 1; /// 是否socket
bool m_sysNonblock: 1; /// 是否hook非阻塞
bool m_userNonblock: 1; /// 是否用户主动设置非阻塞
bool m_isClosed: 1; /// 是否关闭
int m_fd; /// 文件句柄
uint64_t m_recvTimeout; /// 读超时时间毫秒
uint64_t m_sendTimeout; /// 写超时时间毫秒

2.1.1 FdCtx()

  构造函数,初始化FdCtx

1
2
3
4
5
6
7
8
9
10
11
FdCtx::FdCtx(int fd)
:m_isInit(false)
,m_isSocket(false)
,m_sysNonblock(false)
,m_userNonblock(false)
,m_isClosed(false)
,m_fd(fd)
,m_recvTimeout(-1)
,m_sendTimeout(-1) {
init();
}

2.1.2 init()

  初始化FdCtx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
bool FdCtx::init() {
// 初始化过了就直接返回
if(m_isInit) return true;
m_recvTimeout = -1;
m_sendTimeout = -1;

struct stat fd_stat; // return 0 成功取出;-1 失败
if(-1 == fstat(m_fd, &fd_stat)) {
m_isInit = false;
m_isSocket = false;
} else {
m_isInit = true;
// 判断是否为socket
m_isSocket = S_ISSOCK(fd_stat.st_mode);
}
// 如果是socket,设置非阻塞
if(m_isSocket) {
int flags = fcntl_f(m_fd, F_GETFL, 0);
// 如果用户没有设置非阻塞,设置非阻塞
if(!(flags & O_NONBLOCK)) fcntl_f(m_fd, F_SETFL, flags | O_NONBLOCK);
m_sysNonblock = true;
} else m_sysNonblock = false;

m_userNonblock = false;
m_isClosed = false;
return m_isInit;
}

2.1.3 setTimeout()

  设置fd的读写超时时间。

1
2
3
4
5
6
7
8
9
10
void FdCtx::setTimeout(int type, uint64_t v) {
// 如果是读超时,设置读超时时间
if(type == SO_RCVTIMEO) m_recvTimeout = v;
else m_sendTimeout = v;
}

uint64_t FdCtx::getTimeout(int type) {
if(type == SO_RCVTIMEO) return m_recvTimeout;
else return m_sendTimeout;
}

2.2 FdManager

  FdManager是一个单例类,用于管理所有的FdCtx

  成员变量如下:

1
2
RWMutexType m_mutex; /// 读写锁
std::vector<FdCtx::ptr> m_datas;/// 文件句柄集合

2.2.1 FdManager()

  构造函数。

1
2
3
FdManager::FdManager() {
m_datas.resize(64);
}

2.2.2 get()

  获取或创建fd的上下文。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
FdCtx::ptr FdManager::get(int fd, bool auto_create) {
if(fd == -1) return nullptr;
// 集合中没有,并且不自动创建,返回nullptr
RWMutexType::ReadLock lock(m_mutex);
if((int)m_datas.size() <= fd) {
if(!auto_create) return nullptr;
} else {
// 集合中有,直接返回
if(m_datas[fd] || !auto_create) return m_datas[fd];
}
lock.unlock();

RWMutexType::WriteLock lock2(m_mutex);
// 创建新的FdCtx
FdCtx::ptr ctx(new FdCtx(fd));
// fd比集合下标大,需要扩容
if(fd >= (int)m_datas.size()) m_datas.resize(fd * 1.5);
// 将新的FdCtx放入集合中
m_datas[fd] = ctx;
return ctx;
}

2.2.3 del()

  删除fd的上下文。

1
2
3
4
5
void FdManager::del(int fd) {
RWMutexType::WriteLock lock(m_mutex);
if((int)m_datas.size() <= fd) return;
m_datas[fd].reset();
}

2.3 hook模块实现

  将函数接口都存放到extern "C"作用域下,指定函数按照C语言的方式进行编译和链接。它的作用是为了解决C++中函数名重载的问题,使得C++代码可以和C语言代码进行互操作。

2.3.1 定义接口函数指针

  定义hook模块的接口函数指针,只列举部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// sleep_fun 为函数指针
typedef unsigned int (*sleep_fun)(unsigned int seconds);
// 它是一个sleep_fun类型的函数指针变量,表示该变量在其他文件中已经定义,我们只是在当前文件中引用它。
extern sleep_fun sleep_f;

typedef int (*socket_fun)(int domain, int type, int protocol);
extern socket_fun socket_f;

typedef ssize_t(*read_fun)(int fd, void* buf, size_t count);
extern read_fun read_f;

typedef ssize_t (*write_fun)(int fd, const void *buf, size_t count);
extern write_fun write_f;
...

2.3.2 获取接口原始地址

  使用宏来封装对每个原始接口地址的获取。将hook_init()封装到一个结构体的构造函数中,并创建静态对象,能够在main函数运行之前就能将地址保存到函数指针变量当中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#define HOOK_FUN(XX) \
XX(sleep) \
XX(usleep) \
XX(nanosleep) \
XX(socket) \
XX(connect) \
XX(accept) \
XX(read) \
XX(readv) \
XX(recv) \
XX(recvfrom) \
XX(recvmsg) \
XX(write) \
XX(writev) \
XX(send) \
XX(sendto) \
XX(sendmsg) \
XX(close) \
XX(fcntl) \
XX(ioctl) \
XX(getsockopt) \
XX(setsockopt)

void hook_init() {
static bool is_inited = false;
if(is_inited) {
return;
}
// dlsym - 从一个动态链接库或者可执行文件中获取到符号地址。成功返回跟name关联的地址
// RTLD_NEXT 返回第一个匹配到的 "name" 的函数地址
// 取出原函数,赋值给新函数
#define XX(name) name ## _f = (name ## _fun)dlsym(RTLD_NEXT, #name);
HOOK_FUN(XX);
#undef XX
}

// 声明变量
extern "C" {
#define XX(name) name ## _fun name ## _f = nullptr;
HOOK_FUN(XX);
#undef XX
}

宏展开如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
extern "C" {
sleep_fun sleep_f = nullptr;
usleep_fun usleep_f = nullptr;
.....
setsocketopt_fun setsocket_f = nullptr;
}

void hook_init() {
static bool is_inited = false;
if (is_inited) {
return;
}

sleep_f = (sleep_fun)dlsym(RTLD_NEXT, "sleep");
usleep_f = (usleep_fun)dlsym(RTLD_NEXT, "usleep");
...
setsocketopt_f = (setsocketopt_fun)dlsym(RTLD_NEXT, "setsocketopt");
}

2.3.3 set_hook_enable()

  设置是否hook,定义线程局部变量,来控制是否开启hook

1
2
3
4
5
static thread_local bool t_hook_enable = false;

void set_hook_enable(bool flag) {
t_hook_enable = flag;
}

2.3.4 is_hook_enable()

  获取是否hook

1
2
3
bool is_hook_enable() {
return t_hook_enable;
}

2.3.5 do_io()

  IO操作,hook的核心函数。需要注意的是,这段代码使用了模板和可变参数,可以适用于不同类型的IO操作,能够以写同步的方式实现异步的效果。该函数的主要思想如下: 1. 先进行一系列判断,是否按原函数执行。 2. 执行原始函数进行操作,若errno = EINTR,则为系统中断,应该不断重新尝试操作。 3. 若errno = EAGIN,系统已经隐式的将socket设置为非阻塞模式,此时资源咱不可用。 4. 若设置了超时时间,则设置一个执行周期为超时时间的条件定时器,它保证若在超时之前数据就已经来了,然后操作完do_io执行完毕,智能指针tinfo已经销毁了,但是定时器还在,此时弱指针拿到的就是个空指针,将不会执行定时器的回调函数。 5. 在条件定时器的回调函数中设置错误为ETIMEDOUT超时,并且使用cancelEvent强制执行该任务,继续回到该协程执行。 6. 通过addEvent添加事件,若添加事件失败,则将条件定时器删除并返回错误。成功则让出协程执行权。 7. 只有两种情况协程会被拉起: - 超时了,通过定时器回调函数 cancelEvent ---> triggerEvent 会唤醒回来 - addEvent数据回来了会唤醒回来 8. 将定时器取消,若为超时则返回-1并设置errno = ETIMEDOUT,并返回-1。 9. 若为数据来了则retry,重新操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
template<typename OriginFun, typename... Args>
static ssize_t do_io(int fd, OriginFun fun, const char* hook_fun_name,
uint32_t event, int timeout_so, Args&&... args) {
// 如果不hook,直接返回原接口
if(!sylar::t_hook_enable) {
// forward完美转发,可以将传入的可变参数args以原始类型的方式传递给函数fun。
// 这样做的好处是可以避免不必要的类型转换和拷贝,提高代码的效率和性能。
return fun(fd, std::forward<Args>(args)...);
}
// 获取fd对应的FdCtx
sylar::FdCtx::ptr ctx = sylar::FdMgr::GetInstance() -> get(fd);
if(!ctx) { // 没有文件
return fun(fd, std::forward<Args>(args)...);
}
// 检查句柄是否关闭
if(ctx -> isClose()) {
errno = EBADF; // 错误码
return -1;
}
// 如果不是socket或者用户设置了非阻塞,直接调用原始函数
if(!ctx -> isSocket() || ctx -> getUserNonblock()) {
return fun(fd, std::forward<Args>(args)...);
}

uint64_t to = ctx -> getTimeout(timeout_so); // 获取超时时间
std::shared_ptr<timer_info> tinfo(new timer_info);// 设置超时条件

retry:
// 先调用原始函数读数据或写数据 若函数返回值有效就直接返回
ssize_t n = fun(fd, std::forward<Args>(args)...);
SYLAR_LOG_DEBUG(sylar::g_logger) << "do_io <" << hook_fun_name << ">" << " n = " << n;
// 若中断则重试
while(n == -1 && errno == EINTR) {
n = fun(fd, std::forward<Args>(args)...);
}
// 若为阻塞状态
if(n == -1 && errno == EAGAIN) {
sylar::IOManager* iom = sylar::IOManager::GetThis();// 获取IOManager
sylar::Timer::ptr timer; // 定时器
std::weak_ptr<timer_info> winfo(tinfo); // tinfo的弱指针,可以判断tinfo是否已经销毁
// 若超时时间不为-1,则设置定时器
if(to != (uint64_t)-1) {
// 添加条件定时器 —— to时间消息还没来就触发callback
timer = iom -> addConditionTimer(to, [winfo, fd, iom, event]() {
auto t = winfo.lock();
// tinfo失效 || 设了错误 定时器失效了
if(!t || t -> cancelled) return;

t -> cancelled = ETIMEDOUT; // 没错误的话设置为超时而失败
// 取消事件强制唤醒
iom -> cancelEvent(fd, (sylar::IOManager::Event)(event));
}, winfo);
}
// 添加事件
int rt = iom -> addEvent(fd, (sylar::IOManager::Event)(event));
// 添加事件失败
if(SYLAR_UNLIKELY(rt)) {
SYLAR_LOG_ERROR(g_logger) << hook_fun_name << " addEvent("
<< fd << ", " << event << ")";
if(timer) timer -> cancel(); // 取消定时器
return -1;
} else {
/* addEvent成功,把执行时间让出来
* 只有两种情况会从这回来:
* 1) 超时了, timer cancelEvent triggerEvent会唤醒回来
* 2) addEvent数据回来了会唤醒回来 */
sylar::Fiber::GetThis() -> yield();
if(timer) timer -> cancel(); // 回来了还有定时器就取消掉

if(tinfo -> cancelled) { // 从定时任务唤醒,超时失败
errno = tinfo -> cancelled;
return -1;
}
// 数据来了就直接重新去操作
goto retry;
}
}

return n;
}

2.3.6 sleep()

  设置一个定时器然后让出执行权,超时后继续执行该协程。回调函数使用std::bind函数将sylar::IOManager::schedule函数绑定到iom对象上,并传入fiber-1两个参数。由于schedule是个模板类,如果直接与函数绑定,就无法确定函数的类型,从而无法使用std::bind函数。因此,需要先声明函数指针,将函数的类型确定下来,然后再将函数指针与std::bind函数进行绑定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
unsigned int sleep(unsigned int seconds) {
if(!sylar::t_hook_enable) {
return sleep_f(seconds);
}

sylar::Fiber::ptr fiber = sylar::Fiber::GetThis();
sylar::IOManager* iom = sylar::IOManager::GetThis();
/*
* (void(sylar::Scheduler::*)(sylar::Fiber::ptr, int thread)) 是一个函数指针类型,
* 它定义了一个指向 sylar::Scheduler 类中一个参数为 sylar::Fiber::ptr 和 int 类型的成员函数的指针类型。
* 具体来说,它的含义如下:
* void 表示该成员函数的返回值类型,这里是 void 类型。
* (sylar::Scheduler::*) 表示这是一个 sylar::Scheduler 类的成员函数指针类型。
* (sylar::Fiber::ptr, int thread) 表示该成员函数的参数列表
* ,其中第一个参数为 sylar::Fiber::ptr 类型,第二个参数为 int 类型。
*
* 使用 std::bind 绑定了 sylar::IOManager::schedule 函数,
* 并将 iom 实例作为第一个参数传递给了 std::bind 函数,将sylar::IOManager::schedule函数绑定到iom对象上。
* 在这里,第二个参数使用了函数指针类型 (void(sylar::Scheduler::*)(sylar::Fiber::ptr, int thread))
* ,表示要绑定的函数类型是 sylar::Scheduler 类中一个参数为 sylar::Fiber::ptr 和 int 类型的成员函数
* ,这样 std::bind 就可以根据这个函数类型来实例化出一个特定的函数对象,并将 fiber 和 -1 作为参数传递给它。
*/
iom -> addTimer(seconds * 1000, std::bind((void(sylar::Scheduler::*)
(sylar::Fiber::ptr, int thread))&sylar::IOManager::schedule
,iom, fiber, -1));
sylar::Fiber::GetThis() -> yield();
return 0;
}

2.3.7 socket()

  socket函数,并将fd放入到文件管理中。

1
2
3
4
5
6
7
8
9
10
int socket(int domain, int type, int protocol) {
if(!sylar::t_hook_enable) {
return socket_f(domain, type, protocol);
}
int fd = socket_f(domain, type, protocol);
if(fd == -1) return fd;
// 将fd放入到文件管理中
sylar::FdMgr::GetInstance() -> get(fd, true);
return fd;
}

2.3.8 connect()

  socket连接,和do_io思路差不多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
int connect_with_timeout(int fd, const struct sockaddr* addr, socklen_t addrlen, uint64_t timeout_ms) {
if(!sylar::t_hook_enable) {
return connect_f(fd, addr, addrlen);
}
sylar::FdCtx::ptr ctx = sylar::FdMgr::GetInstance() -> get(fd);
if(!ctx || ctx -> isClose()) {
errno = EBADF;
return -1;
}

if(!ctx -> isSocket()) {
return connect_f(fd, addr, addrlen);
}

if(ctx -> getUserNonblock()) {
return connect_f(fd, addr, addrlen);
}

// ----- 异步开始 -----
// 先尝试连接
int n = connect_f(fd, addr, addrlen);
// 连接成功
if(n == 0) {
return 0;
// 其他错误,EINPROGRESS表示连接操作正在进行中
} else if(n != -1 || errno != EINPROGRESS) {
return n;
}

sylar::IOManager* iom = sylar::IOManager::GetThis();
sylar::Timer::ptr timer;
std::shared_ptr<timer_info> tinfo(new timer_info);
std::weak_ptr<timer_info> winfo(tinfo);

// 设置了超时时间
if(timeout_ms != (uint64_t)-1) {
// 添加条件定时器
timer = iom -> addConditionTimer(timeout_ms, [winfo, fd, iom]() {
auto t = winfo.lock();
if(!t || t -> cancelled) {
return;
}
t -> cancelled = ETIMEDOUT;
iom -> cancelEvent(fd, sylar::IOManager::WRITE);
}, winfo);
}
// 添加一个写事件
int rt = iom -> addEvent(fd, sylar::IOManager::WRITE);
if(rt == 0) {
/* 只有两种情况唤醒:
* 1. 超时,从定时器唤醒
* 2. 连接成功,从epoll_wait拿到事件 */
sylar::Fiber::GetThis() -> yield();
if(timer) {
timer -> cancel();
}
// 从定时器唤醒,超时失败
if(tinfo -> cancelled) {
errno = tinfo -> cancelled;
return -1;
}
} else { // 添加事件失败
if(timer) {
timer -> cancel();
}
SYLAR_LOG_ERROR(g_logger) << "connect addEvent(" << fd << ", WRITE) error";
}

int error = 0;
socklen_t len = sizeof(int);
// 获取套接字的错误状态
if(-1 == getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len)) {
return -1;
}
// 没有错误,连接成功
if(!error) {
return 0;
// 有错误,连接失败
} else {
errno = error;
return -1;
}
}

int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
return connect_with_timeout(sockfd, addr, addrlen, sylar::s_connect_timeout);
}

2.3.9 close()

  关闭socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int close(int fd) {
if(!sylar::t_hook_enable) {
return close_f(fd);
}

sylar::FdCtx::ptr ctx = sylar::FdMgr::GetInstance() -> get(fd);
if(ctx) {
auto iom = sylar::IOManager::GetThis();
if(iom) {
// 取消所有事件
iom -> cancelAll(fd);
}
// 在文件管理中删除fd
sylar::FdMgr::GetInstance() -> del(fd);
}
return close_f(fd);
}

2.3.10 fcntl()

  对用户反馈是否是用户设置了非阻塞模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int fcntl(int fd, int cmd, ... /* arg */) {
va_list va;
va_start(va, cmd);
switch (cmd) {
case F_SETFL:
{
int arg = va_arg(va, int);
va_end(va);
sylar::FdCtx::ptr ctx = sylar::FdMgr::GetInstance()->get(fd);
if (!ctx || ctx->isClosed()) {
return fcntl_f(fd, cmd, arg);
}
ctx->setUserNonblock(arg & O_NONBLOCK);
if (ctx->getSysNonblock()) {
arg |= O_NONBLOCK;
} else {
arg &= ~O_NONBLOCK;
}
return fcntl_f(fd, cmd, arg);
}
.....
case F_GETFL:
{
va_end(va);
int arg = fcntl_f(fd, cmd);
sylar::FdCtx::ptr ctx = sylar::FdMgr::GetInstance()->get(fd);
if (!ctx || ctx->isClosed() || !ctx->isSocket()) {
return arg;
}
if (ctx->getUserNonblock()) {
return arg | O_NONBLOCK;
} else {
return arg & ~O_NONBLOCK;
}
}
.....
}
}

2.3.11 ioctl()

  对设备进行控制操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*	value为指向int类型的指针,如果该指针指向的值为0,则表示关闭非阻塞模式;如果该指针指向的值为非0,则表示打开非阻塞模式。
* int value = 1;
* ioctl(fd, FIONBIO, &value);
*/
int ioctl(int d, unsigned long int request, ...) {
va_list va;
va_start(va, request);
void* arg = va_arg(va, void*);
va_end(va);
// FIONBIO用于设置文件描述符的非阻塞模式
if(FIONBIO == request) {
bool user_nonblock = !!*(int*)arg;
sylar::FdCtx::ptr ctx = sylar::FdMgr::GetInstance() -> get(d);
if(!ctx || ctx -> isClose() || !ctx -> isSocket()) {
return ioctl_f(d, request, arg);
}
ctx -> setUserNonblock(user_nonblock);
}
return ioctl_f(d, request, arg);
}

2.3.12 setsockopt()

  设置socket选项。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen) {
if(!sylar::t_hook_enable) {
return setsockopt_f(sockfd, level, optname, optval, optlen);
}
// 如果设置socket通用选项
if(level == SOL_SOCKET) {
// 如果设置超时选项
if(optname == SO_RCVTIMEO || optname == SO_SNDTIMEO) {
sylar::FdCtx::ptr ctx = sylar::FdMgr::GetInstance() -> get(sockfd);
if(ctx) {
const timeval* v = (const timeval*)optval;
// 转为毫秒保存
ctx -> setTimeout(optname, v -> tv_sec * 1000 + v -> tv_usec / 1000);
}
}
}
return setsockopt_f(sockfd, level, optname, optval, optlen);
}

3. 总结

  有了hook模块的加持,在使用IO协程调度器时,如果不想该操作导致整个线程的阻塞,我们可以使用scheduler将该任务加入到任务队列中,这样当任务阻塞时,只会使执行该任务的协程挂起,去执行别的任务,在消息来后或者达到超时时间继续执行该协程任务,这样就实现了异步操作。