C++服务器框架:协程库——线程模块

提供线程类和线程同步类,基于pthread实现

1. 线程模块概述

该模块基于pthread实现。c++11中的thread也是由pthread封装实现的,但是没有提供读写互斥量,读写锁,自旋锁等,所以sylar在项目中自己封装了pthread

包括以下类:

  • Thread:线程类,构造函数传入线程入口函数和线程名称,线程入口函数类型为void(),如果带参数,则需要用std::bind进行绑定。线程类构造之后线程即开始运行,构造函数在线程真正开始运行之后返回。

锁模块封装了信号量、互斥量、读写锁、自旋锁、原子锁:

  • Semaphore:技术信号量,基于sem_t实现。
  • Mutex:互斥锁,基于pthread_mutex_t实现。
  • RWMutex:读写锁,基于pthread_rwlock_t实现。
  • Spinlock:自旋锁,基于pthread_spinlock_t实现。
  • CASLock:原子锁,基于std::atomic_flag实现。

关于线程id的问题,在获取线程id时使用syscall获得唯一的线程id

1
2
3
进程pid: getpid()                 
线程tid: pthread_self()     //进程内唯一,但是在不同进程则不唯一。
线程pid: syscall(SYS_gettid)     //系统内是唯一的

2. Thread

定义了两个线程局部变量用于指向当前线程以及线程的名称。

static thread_localC++中的一个关键字组合,用于定义静态线程本地存储变量。具体来说,当一个变量被声明为static thread_local时,它会在每个线程中拥有自己独立的静态实例,并且对其他线程不可见。这使得变量可以跨越多个函数调用和代码块,在整个程序运行期间保持其状态和值不变。

需要注意的是,由于静态线程本地存储变量是线程特定的,因此它们的初始化和销毁时机也与普通静态变量不同。具体来说,在每个线程首次访问该变量时会进行初始化,在线程结束时才会进行销毁,而不是在程序启动或运行期间进行一次性初始化或销毁。

1
2
3
4
// 指向当前线程 
static thread_local Thread *t_thread = nullptr;
// 指向线程名称
static thread_local std::string t_thread_name = "UNKNOW";

成员变量:

1
2
3
4
5
pid_t m_id = -1;                // 线程id, 默认为-1, 表示无效
pthread_t m_thread = 0; // 线程结构体
std::function<void()> m_cb; // 线程执行函数
std::string m_name; // 线程名称
Semaphore m_semaphore; // 信号量

2.1 成员函数

2.1.1 Thread()

初始化线程执行函数、线程名称,创建新线程。

函数原型:int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);,是POSIX线程库中的一个函数,用于创建一个新的线程。

  • thread:指向pthread_t类型的指针,用于存储新创建线程的ID
  • attr:指向pthread_attr_t类型的指针,用于设置新线程的属性,一般传入NULL表示使用默认属性。
  • start_routine:一个函数指针,指向新线程将要执行的函数,该函数的原型为void* (*)(void*),接受一个void*类型的参数,返回一个void*类型的指针。
  • arg:传递给start_routine函数的参数。

调用pthread_create函数后,将会创建一个新线程,并开始执行通过start_routine传递给它的函数。新线程的ID将存储在thread指向的变量中。请注意,新线程将在与调用pthread_create函数的线程并发执行的情况下运行。

1
2
3
4
5
6
7
8
9
10
11
12
Thread::Thread(std::function<void()> cb, const std::string &name)
: m_cb(cb)
, m_name(name) {
if(name.empty()) m_name = "unknow";
int res = pthread_create(&m_thread, nullptr, &Thread::run, this); // 创建线程, 成功返回0
if(res) {
SYLAR_LOG_ERROR(g_logger) << "pthread_create thread fail, res = " << res
<< " name = " << name;
throw std::logic_error("pthread_create error");//创建线程失败,抛出异常
}
m_semaphore.wait(); // 等待线程执行
}

2.1.2 ~Thread()

首先检查m_thread是否存在,如果存在,则调用pthread_detach(m_thread)函数来分离已经结束的线程。pthread_detach函数用于释放与线程关联的资源,并确保线程可以安全地终止。通过在析构函数中分离线程,可以避免在主线程退出时出现悬挂线程,从而防止内存泄漏和其他问题。

1
2
3
4
5
Thread::~Thread() {
if(m_thread) {
pthread_detach(m_thread); // 分离线程
}
}

2.1.3 join()

用于等待指定线程的终止,并获取该线程的返回值。它的原型为:

1
int pthread_join(pthread_t thread, void **retval);
  • thread:要等待的线程的ID
  • retval:指向指针的指针,用于存储线程的返回值。如果不需要获取返回值,则可以将其设置为NULL。 pthread_join函数是POSIX线程API中的一种同步原语,它允许调用线程等待另一个线程的终止。当一个线程调用pthread_join时,它会阻塞直到指定的线程终止。一旦目标线程终止,调用线程恢复执行,并可以选择通过retval参数获取线程的返回值。如果调用线程不关心返回值,也可以将retval参数设置为NULL
1
2
3
4
5
6
7
8
9
10
11
12
void Thread::join()
{
if(m_thread) {
int res = pthread_join(m_thread, nullptr); // 等待线程执行完成,成功返回0
if(res) {
SYLAR_LOG_ERROR(g_logger) << "pthread_join thread fail, res = " << res
<< " name = " << m_name;
throw std::logic_error("pthread_join error");//线程执行失败,抛出异常
}
m_thread = 0;//线程执行完成,将线程结构置为0
}
}

2.1.4 run()

线程执行函数,通过信号量,能够确保构造函数在创建线程之后会一直阻塞,直到run方法运行并通知信号量,构造函数才会返回。

在构造函数中完成线程的启动和初始化操作,可能会导致线程还没有完全启动就被调用,从而导致一些未知的问题。因此,在出构造函数之前,确保线程先跑起来,保证能够初始化id,可以避免这种情况的发生。同时,这也可以保证线程的安全性和稳定性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void *Thread::run(void *arg) {
// 拿到新创建的Thread对象, 更新当前线程
Thread *thread = (Thread *)arg;
t_thread = thread;
t_thread_name = thread -> m_name;
// 设置当前线程的id
// 只有进了run方法才是新线程在执行,创建时是由主线程完成的,threadId为主线程的
thread -> m_id = sylar::GetThreadId();
pthread_setname_np(pthread_self(), thread -> m_name.substr(0, 15).c_str());//设置线程名称

// pthread_creat时返回引用 防止函数有智能指针,
std::function<void()> cb;
cb.swap(thread -> m_cb);//交换线程执行函数
// 在出构造函数之前,确保线程先跑起来,保证能够初始化id
thread -> m_semaphore.notify();

cb(); // 执行线程函数
return 0;
}

3 线程同步类

3.1 Semaphore

Semaphore类是一个信号量类,用于实现线程之间的同步和互斥。信号量是一种用于控制对共享资源的访问的同步原语,它通常用于限制对资源的并发访问,以避免竞争条件和数据竞争。可以看作是一个计数器,用来表示某种资源的数量或者可用的某种事物的数量。线程可以通过等待信号量来减少其值,从而表示它正在使用资源,也可以通过释放信号量来增加其值,从而表示它已经释放资源。

3.1.1 Semaphore()

初始化信号量,函数原型:int sem_init(sem_t *sem, int pshared, unsigned int value);

  • sem:指向sem_t类型的指针,用于存储信号量对象。
  • pshared:如果非零,表示这个信号量可以被其他进程访问;如果为零,表示这个信号量只对当前进程内的线程可见。
  • value:信号量的初始值。在信号量中,这个值通常用来表示资源的数量或者可用的某种事物的数量。

该函数成功时返回0,出错时返回-1,并设置errno以指示错误类型。 > 使用sem_init初始化信号量后,线程可以使用sem_wait(或sem_wait的宏P)、sem_post(或V宏)等函数来等待(减少信号量的值)或释放(增加信号量的值)资源。

1
2
3
4
5
Semaphore::Semaphore(uint32_t count) {
if(sem_init(&m_semaphore, 0, count)) {
throw std::logic_error("sem_init error"); //初始化信号量失败,抛出异常
}
}

3.1.2 ~Semaphore()

销毁信号量,函数原型:int sem_destroy(sem_t *sem);

注意,只有在确保没有任何线程或进程正在使用该信号量时,才应该调用sem_destroy()函数。否则,可能会导致未定义的行为。此外,如果在调用sem_destroy()函数之前,没有使用sem_post()函数将信号量的值增加到其初始值,则可能会导致在销毁信号量时出现死锁情况。

1
2
3
Semaphore::~Semaphore() {
sem_destroy(&m_semaphore); //销毁信号量
}

3.1.3 wait()

等待信号量,函数原型:int sem_wait(sem_t *sem);

其中,参数sem是指向要获取的信号量的指针。如果在调用此函数时信号量的值大于零,则该值将递减并立即返回。如果信号量的值为零,则当前线程将被阻塞,直到信号量的值大于零或者被信号中断。

如果信号量的值大于0,sem_wait会立即返回并将信号量的值减1,如果信号量的值为0,sem_wait会阻塞,直到信号量的值大于0。

当线程成功获取信号量时,可以执行相应的操作来使用资源。使用完资源后,可以通过调用sem_post()函数来增加信号量的值以释放资源,并使其他等待线程得以继续执行。

1
2
3
4
5
void Semaphore::wait() {
if(sem_wait(&m_semaphore)) {
throw std::logic_error("sem_wait error"); //等待信号量失败,抛出异常
}
}

3.1.4 notify()

通知信号量,函数原型:int sem_post(sem_t *sem);用于向指定的命名或未命名信号量发送信号,使其计数器加1。如果有进程或线程正在等待该信号量,那么其中一个将被唤醒以继续执行。

参数sem:指向要增加计数器的信号量的指针。成功时返回0,出错时返回-1,并设置errno以指示错误类型。

1
2
3
4
5
void Semaphore::notify() {
if(sem_post(&m_semaphore)) {
throw std::logic_error("sem_post error"); //通知信号量失败,抛出异常
}
}

3.2 Mutex

Mutex类是一个互斥锁类,用于实现线程之间的互斥。互斥锁是一种同步原语,用于控制对共享资源的访问。互斥锁可以确保在任何时候只有一个线程可以访问共享资源,从而避免竞争条件和数据竞争。

  • 互斥锁必须由锁定它的同一线程解锁。
  • 一个线程不能解锁另一个线程锁定的互斥锁。
  • 尝试解锁一个未被锁定的互斥锁会导致错误。

成员变量:

1
pthread_mutex_t m_mutex; // 互斥锁

3.2.1 Mutex()

构造函数,初始化互斥锁,函数原型:int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);

  • mutex:指向pthread_mutex_t类型的指针,用于存储互斥锁对象。
  • attr:指向pthread_mutexattr_t类型的指针,表示互斥锁的属性。如果为NULL,则使用默认属性。

成功时返回0,失败时返回错误码并设置errno变量。

1
2
3
4
5
Mutex::Mutex() {
if(pthread_mutex_init(&m_mutex, nullptr)) {
throw std::logic_error("pthread_mutex_init error"); //初始化互斥锁失败,抛出异常
}
}

3.2.2 ~Mutex()

析构函数,销毁已初始化的互斥锁对象,函数原型:int pthread_mutex_destroy(pthread_mutex_t *mutex);

  • mutex:指向要销毁的互斥锁对象的指针。

成功时返回0,失败时返回错误码并设置errno变量。

1
2
3
Mutex::~Mutex() {
pthread_mutex_destroy(&m_mutex); //销毁互斥锁
}

3.2.3 lock()

加锁,函数原型:int pthread_mutex_lock(pthread_mutex_t *mutex);

  • mutex:指向要加锁的互斥锁对象的指针。

成功时返回0,失败时返回错误码并设置errno变量。

当一个线程调用pthread_mutex_lock()时,如果当前该互斥锁没有被其它线程持有,则该线程会获得该互斥锁,并将其标记为已被持有;如果该互斥锁已经被其它线程持有,则当前线程会被阻塞,直到该互斥锁被释放并重新尝试加锁。

1
2
3
void lock() {
pthread_mutex_lock(&m_mutex);
}

3.2.4 unlock()

解锁,函数原型:int pthread_mutex_unlock(pthread_mutex_t *mutex);

  • mutex:指向要解锁的互斥锁对象的指针。

成功时返回0,失败时返回错误码并设置errno变量。

当一个线程调用pthread_mutex_unlock()时,它会释放该互斥锁,并且如果有其它线程正在等待该锁,则其中一个线程将被唤醒以继续执行。
如果当前没有线程解锁一个未被锁定的互斥锁,或者不是由该线程锁定的互斥锁,那么pthread_mutex_unlock()函数将返回EPERM错误,表示操作不允许

1
2
3
void unlock() {
pthread_mutex_unlock(&m_mutex);
}

3.3 RWMutex

RWMutex类是一个读写锁类,用于实现读写锁,在多线程环境下对共享资源进行访问控制。与互斥锁不同,读写锁允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。这种机制可以提高读取性能,但是写入性能会受到影响,还需要注意避免读写锁死锁等问题。

成员变量:

1
pthread_rwlock_t m_lock; // 读写锁

3.3.1 RWMutex()

构造函数,初始化读写锁,函数原型:int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *attr);

  • rwlock:指向pthread_rwlock_t类型的指针,用于存储读写锁对象。
  • attr:指向pthread_rwlockattr_t类型的指针,表示读写锁的属性。如果为NULL,则使用默认属性。

成功时返回0,失败时返回错误码并设置errno变量。

1
2
3
RWMutex::RWMutex() {
pthread_rwlock_init(&m_lock, nullptr)
}

3.3.2 ~RWMutex()

析构函数,销毁已初始化的读写锁对象,函数原型:int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

  • rwlock:指向要销毁的读写锁对象的指针。

成功时返回0,失败时返回错误码并设置errno变量。

1
2
3
RWMutex::~RWMutex() {
pthread_rwlock_destroy(&m_lock);
}

3.3.3 rdlock()

加读锁,int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);用于获取读取锁(pthread_rwlock_t)上的共享读取访问权限。它允许多个线程同时读取共享资源,但不能写入它。如果有线程已经持有写入锁,则其他线程将被阻塞直到写入锁被释放。调用此函数时,如果另一个线程已经持有写入锁,则该线程将被阻塞,直到写入锁被释放。

1
2
3
void rdlock() {
pthread_rwlock_rdlock(&m_lock);
}

3.3.4 wrlock()

加写锁,int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);用于获取写入锁(pthread_rwlock_t)上的独占写入访问权限。它阻止其他线程读取或写入共享资源,直到写入锁被释放。如果有线程已经持有读取锁或写入锁,则其他线程将被阻塞直到写入锁被释放。

1
2
3
void wrlock() {
pthread_rwlock_wrlock(&m_lock);
}

3.3.5 unlock()

解锁,int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);有以下几点需要注意。

  1. 释放锁:当一个线程完成对共享资源的访问后,它需要调用pthread_rwlock_unlock()来释放它所持有的锁。这可以是读锁或写锁。
  2. 未定义行为:如果调用pthread_rwlock_unlock()的线程并没有持有任何锁(即没有通过pthread_rwlock_rdlock()pthread_rwlock_wrlock()成功获取锁),则会导致未定义的行为。意味着程序可能会出现错误或异常。
  3. 锁的销毁:如果读写锁已经被销毁(通过pthread_rwlock_destroy()),则再次调用pthread_rwlock_unlock()也会导致未定义的行为。因此,在销毁锁之前,确保所有的锁都已经被正确释放。
  4. 正确的同步:在使用读写锁时,需要确保在访问共享资源前后正确地获取和释放锁,防止死锁和确保资源的一致性。
  5. 资源访问:释放锁后,其他线程可以获取相应的锁并访问共享资源。这允许在多线程环境中对资源进行并发访问,同时保持数据的完整性。
1
2
3
void unlock() {
pthread_rwlock_unlock(&m_lock);
}

3.4 Spinlock

mutex不同,自旋锁不会使线程进入睡眠状态,而是在获取锁时进行忙等待,直到锁可用。
当锁被释放时,等待获取锁的线程将立即获取锁,从而避免了线程进入和退出睡眠状态的额外开销。

成员变量:

1
pthread_spinlock_t m_mutex; // 自旋锁

3.4.1 Spinlock()

构造函数,初始化自旋锁,函数原型:int pthread_spin_init(pthread_spinlock_t *lock, int pshared);

  • lock:指向pthread_spinlock_t类型的指针,用于存储自旋锁对象。
  • pshared:如果为0,则自旋锁是进程内的;如果为1,则自旋锁是进程间的。

在调用pthread_spin_init函数之前,必须先分配内存空间来存储自旋锁变量。与pthread_rwlock_t类似,需要在使用自旋锁前先进行初始化才能正确使用。

1
2
3
Spinlock::Spinlock() {
pthread_spin_init(&m_mutex, 0);
}

3.4.2 ~Spinlock()

析构函数,销毁已初始化的自旋锁对象,函数原型:int pthread_spin_destroy(pthread_spinlock_t *lock);

该函数确保在销毁自旋锁之前所有等待的线程都被解除阻塞并返回适当的错误码。如果自旋锁已经被销毁,则再次调用pthread_spin_destroy将导致未定义的行为。

1
2
3
Spinlock::~Spinlock() {
pthread_spin_destroy(&m_mutex);
}

3.4.3 lock()

加锁,函数原型:int pthread_spin_lock(pthread_spinlock_t *lock);

mutex不同,自旋锁在获取锁时忙等待,即不断地检查锁状态是否可用,如果不可用则一直循环等待,直到锁可用。当锁被其他线程持有时,调用pthread_spin_lock()的线程将在自旋等待中消耗CPU时间,直到锁被释放并获取到锁。

1
2
3
void lock() {
pthread_spin_lock(&m_mutex);
}

3.4.4 unlock()

解锁,函数原型:int pthread_spin_unlock(pthread_spinlock_t *lock);

调用该函数可以使其他线程获取相应的锁来访问共享资源。与mutex不同,自旋锁在释放锁时并不会导致线程进入睡眠状态,而是立即释放锁并允许等待获取锁的线程快速地获取锁来访问共享资源,从而避免了线程进入和退出睡眠状态的额外开销。

1
2
3
void unlock() {
pthread_spin_unlock(&m_mutex);
}

3.5 CASLock

CASLock类是一个原子锁类,用于实现原子操作。原子操作是一种不可中断的操作,它要么全部执行,要么全部不执行,不会出现部分执行的情况。用于实现线程同步和互斥,以确保对共享资源的访问是安全的。

成员变量:

1
2
3
// m_mutex是一个原子布尔类型,具有特殊的原子性质,可以用于实现线程间同步和互斥。
// volatile关键字表示该变量可能会被异步修改,因此编译器不会对其进行优化,而是每次都从内存中读取该变量的值。
std::atomic_flag m_mutex; // 原子锁

3.5.1 CASLock()

atomic_flag.clear()C++标准库中的一个原子操作函数,用于将给定的原子标志位(atomic flag)清除或重置为未设置状态。

在多线程编程中,原子标志位通常用于实现简单的锁机制,以确保对共享资源的访问是互斥的。使用atomic_flag.clear()可以轻松地重置标志位,使之再次可用于控制对共享资源的访问。需要注意的是,由于该函数是一个原子操作,因此可以安全地在多个线程之间使用,而无需担心竞态条件和数据竞争等问题。

1
2
3
CASLock::CASLock() {
m_mutex.clear();
}

3.5.2 lock()

std::atomic_flag_test_and_set_explicit()C++标准库中的一个原子操作函数,用于测试给定的原子标志位(atomic flag)是否被设置,并在测试后将其设置为已设置状态。该函数接受一个指向原子标志位对象的指针作为参数,并返回一个布尔值,表示在调用函数前该标志位是否已经被设置。第二个可选参数order用于指定内存序,以控制原子操作的内存顺序和同步行为。通过循环等待实现了互斥锁的效果。

std::memory_order_acquireC++中的一种内存序,用于指定原子操作的同步和内存顺序。具体来说,使用std::memory_order_acquire可以确保在当前线程获取锁之前,所有该线程之前发生的写操作都被完全同步到主内存中。这样可以防止编译器或硬件对写操作进行重排序或延迟,从而确保其他线程可以正确地读取共享资源的最新值。

1
2
3
void lock() {
while(std::atomic_flag_test_and_set_explicit(&m_mutex, std::memory_order_acquire)) ;
}

3.5.3 unlock()

std::atomic_flag_clear_explicit()C++标准库中的一个原子操作函数,用于将给定的原子标志位(atomic flag)清除或重置为未设置状态。该函数接受一个指向原子标志位对象的指针作为参数,并使用可选的第二个参数order来指定内存序,以控制原子操作的同步和内存顺序。

1
2
3
void unlock() {
m_mutex.clear();
}

4. 总结

  • 对日志系统的临界资源进行互斥访问时,使用自旋锁而不是互斥锁。

    1. mutex使用系统调用将线程阻塞,并等待其他线程释放锁后再唤醒它,这种方式适用于长时间持有锁的情况。而spinlock在获取锁时忙等待,即不断地检查锁状态是否可用,如果不可用则一直循环等待,因此适用于短时间持有锁的情况。
    2. 由于mutex会将线程阻塞,因此在高并发情况下可能会出现线程频繁地进入和退出睡眠状态,导致系统开销大。而spinlock虽然不会使线程进入睡眠状态,但会消耗大量的CPU时间,在高并发情况下也容易导致性能问题。
    3. 当一个线程尝试获取已经被其他线程持有的锁时,mutex会将该线程阻塞,而spinlock则会在自旋等待中消耗CPU时间。如果锁的持有时间较短,则spinlockmutex更适合使用;如果锁的持有时间较长,则mutexspinlock更适合使用。
  • 在构造函数中创建子进程并等待其完成执行是一种常见的技术,可以通过信号量(Semaphore)来实现主线程等待子线程完成。

    1. 首先,在主线程中创建一个Semaphore对象并初始化为0。然后,在构造函数中创建子线程,并将Semaphore对象传递给子线程。子线程将执行所需的操作,并在最后使用Semaphore对象发出信号通知主线程它已经完成了工作。
    2. 主线程在构造函数中调用Semaphore对象的wait方法,这会使主线程阻塞直到收到信号并且Semaphore对象的计数器值大于0。当子线程发出信号时,Semaphore对象的计数器值增加1,因此主线程可以继续执行构造函数的剩余部分。