使用C++11新特性实现一个通用的线程池设计

描述

在C++11标准之前,多线程编程只能使用pthread_xxx开头的一组POSIX标准的接口。从C++11标准开始,多线程相关接口封装在了C++的std命名空间里。

在Linux中,即便我们用std命名空间中的接口,也需要使用-lpthread链接pthread库,不难猜出,C++多线程的底层依然还是POSIX标准的接口。你可能会有疑问,既然底层还是使用pthread库,为什么不直接用pthread,而要绕一大圈再封装一层呢?

在我看来,除了统一C++编程接口之外,C++11标准更多的是在语言层面做了很多优化,规避了原来C语言中的很多陷阱,比如C++11中的lock_guard、future、promise等技术,将原来C语言中语法上容易犯错的地方进行了简化,简单来说就是将原来依赖人的地方交给了编译器(很多时候机器比人更可靠)。比如在C++11标准之前,我们使用mutex是像下面这样的:

 

pthread_mutex_lock(mutex)
....
if (condition) {
  ...
} elser if {
  ...
} else {
  ...
}
...
pthread_mutex_unlock(mutex)
相信有mutex使用经验的人都或多或少都在这上面踩过坑,比如少写了一个unlock,中间异常退出没有执行到unlock等等各种各样的情况导致的锁没有被正确释放。而在C++11标准中,我们只需要使用lock_guard就可以了,比如:
lock_gruard locker(mutex)
...
if (condition) {
  ...
} elser if {
  ...
} else {
  ...
}
...

 

C++编译器会自动为我们插入释放锁的代码,这样我们就不用时刻担心锁有没有正确释放了。我个人的感觉是,从C++11标准开始,C++变得不那么可怕了,甚至很多时候觉得C++变得好用了。

这篇文章我们试着使用C++11标准的新特性来实现一个通用的线程池。首先,我们来看一下C++11标准中线程是如何创建的,先看代码:

 

#include 
#include 


using namespace std;
void threadFunc(int &a) {
    a += 10;
    std::cout << "this is thread fun!" << std::endl;
}


int main() {
    int x = 10;
    std::thread t1(threadFunc, std::ref(x));
    t1.join();


    std::cout << "x=" << x << std::endl;
}

 

使用std::thread创建一个t1线程对象,传入线程主函数,将x以引用的方式传入线程主函数,接着调用join方法等待主函数threadFunc执行完成。上面x最终的结果将是20。

整个流程和使用pthread_create其实区别不大,只是C++11将线程的创建封装成了一个类,使得我们可以用面向对象编程的方式来创建多线程。

我们可以思考一下,要实现一个线程池,应该怎么做?这个问题我们可以倒过来想一下,线程池应该怎么使用。比如,对于一个web服务器来讲,为了能利用多核的优势,我们可以在主线程里接收到请求之后,将对这个请求的具体操作交给worker线程,比如像下面这样:

线程池

这个流程我们在C语言网络编程那个系列文章中有非常详细的说明,如果你get不到这个例子是什么意思,建议你去看一下那个系列的文章(C语言网络编程系列合集)

主线程将请求交给Worker线程这个好理解,但是你有没有想过,它怎么就能交给Worker线程呢?线程创建完之后,对应的主函数就已经运行起来了,对应accept出来的套接字要怎么传递给一个正在运行的函数呢?

为了说明这个问题,我们先来看一段Golang的代码,暂时看不懂没关系,后面我会解释

 

func TestWorker(t *testing.T) {
  msg := make(chan int, 1)
  notify := make(chan struct{}, 0)
  poolSize := 10


  buildWorkers(poolSize, msg, notify)


  // 模拟任务
  for i := 0; i < poolSize; i++ {
    msg <- i
  }


  for i := 0; i < poolSize; i++ {
    <-notify
  }
}


func buildWorkers(poolSize int, msg <-chan int, notify chan<- struct{}) {
  for i := 0; i < poolSize; i++ {
    go func(i int) {
      if ret := recover(); ret != nil {
        log.Printf("panic: %v", ret)
      }


      log.Println("worker-id:", i)


      for {
        select {
        case m := <-msg:
          fmt.Println("m:", m)
          notify <- struct{}{}
        }
      }
    }(i)
  }
}

 

buildWorkers方法创建了10个协程,每个协程里面都有一个for循环,一开始每个for循环都阻塞在select语句中的case m := ←msg,如果之前没有接触过Go语言(这里的select你可以简单和Linux中的select技术类比)。另外还有两个通道,一个是msg和notify,msg用来传递参数,notify用来通知外面这个协程任务已经执行完了。

在TestWorker方法中,我们模拟了10个任务,这10个任务不断的往msg通道中发送数据,当msg有数据之后,我们创建的那10个协程就会争取从msg通道接收消息,只要接收到消息就说明这是执行任务所必需的参数。执行完成之后向notify发送消息。在TestWorker中我们同样接收了10次,在没有消息的时候就会阻塞在<-notify这一行,直到有协程执行完成向notify通道发送消息,这里就从<-notify返回,进入下一次循环。上面其实就是一个非常简单的协程池了,当然为了演示,代码并不是很完整。

运行上面的代码,得到的结果大概像下面这样

 

2023/10/07 21:37:44 worker-id: 9
2023/10/07 21:37:44 worker-id: 4
2023/10/07 21:37:44 worker-id: 8
2023/10/07 21:37:44 m: 2
2023/10/07 21:37:44 m: 0
2023/10/07 21:37:44 worker-id: 5
2023/10/07 21:37:44 m: 3
2023/10/07 21:37:44 worker-id: 2
2023/10/07 21:37:44 worker-id: 1
2023/10/07 21:37:44 m: 5
2023/10/07 21:37:44 m: 4
2023/10/07 21:37:44 worker-id: 7
2023/10/07 21:37:44 m: 6
2023/10/07 21:37:44 worker-id: 0
2023/10/07 21:37:44 m: 7
2023/10/07 21:37:44 m: 1
2023/10/07 21:37:44 worker-id: 6
2023/10/07 21:37:44 m: 8
2023/10/07 21:37:44 m: 9
2023/10/07 21:37:44 worker-id: 3

 

从上面的结果来看,协程运行并不是顺序执行的,这和多线程是一样的道理。上面Golang的代码执行的流程我画了一张图,如下:

线程池

注意箭头的方向,所有协程都不断的尝试从channel中接收消息,拿到程序运行必要的参数,当msg中有数据时从case m := <-msg中苏醒并执行具体的业务逻辑,我们知道,在Golang中channel是线程安全的,其内部有一把锁,这把锁就是mutex,下面是channel底层结构体

 

// src/runtime/chan.go:33
type hchan struct {
  ...
  lock mutex
}

 

channel除了能保证线程安全,还能保证顺序性,也就是发送方最先发送的,在接收方一定也是最先收到的。这不就是一个加了锁的队列吗?我们可以试着想一下在C++中是不是也可以实现类似的效果呢?不难想到,我们可以使用一个队列在各个线程之间传递数据,像下面这样:

线程池

主线程accept出来的套接字,只管往队列里面丢就可以了,我们创建的一堆worker线程,不断的尝试从队列里面pop数据。这样,我们就解决了线程之间的交互问题。

下面,我们就参照上面Golang的代码,先把这个框架给搭出来,然后再在这个基础之上去完善代码,最后实现一个准生产的线程池。

我们先参照上面Golang的代码,实现相似逻辑,代码如下:

 

#include 
#include 
#include 


void threadFunc(std::queue& q) {
    while (1) {
        if (!q.empty()) {
            int param = q.front();
            q.pop();
            std::cout << "param:" << param << std::endl;
        }
    }
}


void jobDispatch(std::queue& q) {
    for (int i = 0; i < 1000; i++) {
        q.push(i);
    }
}


int main() {
   std::queue q1;


   std::vector ths;
   for (int i = 0; i < 10; i++) {
       ths.emplace_back(threadFunc, std::ref(q1));
   }


   jobDispatch(q1);


   for (auto& th: ths) {
       th.join();
   }


   return 0;
}

 

上面的代码尽可能的还原了Golang的逻辑,我们来分析一下这段代码。在main函数中,创建了一个队列q1,这个队列用来向线程池传递参数,接着创建了10个线程保存在了vector中,将q1以引用的形式传入线程池主函数(注意:这里传引用必须使用std::ref包装一下),再接着调用jobDispatch模拟任务分配然后每个线程调用join等待结束。

接着我们来看线程池主函数threadFunc,这个函数接收一个队列q作为参数,这里的q就是我们在创建线程池的时候传进来的q1,然后是一个死循环,在这个循环里面我们不断的判断队列是否为空,如果不为空就从队列取出一个元素出来。最后,分配任务的函数jobDispatch向队列q1里面push了1000个元素,来模拟1000个任务。

上面的代码当然是有问题的,有兴趣的可以把这段代码拷贝下来把自己跑一下,你会发现虽然代码能跑,但是结果完全看不懂。

首先,第一个问题就是queue不是线程安全的。所以,这个队列得有一把锁,比如:

 

std::mutex mtx;


void threadFunc(std::queue& q) {
    while (true) {
        if (!q.empty()) {
            std::lock_guard ltx(mtx);
            int param = q.front();
            q.pop();
            std::cout << "param:" << param << std::endl;
        }
    }
}

 

我们在threadFund函数中的出队列之前加了一把锁。这把锁是全局的,每个线程都要先拿到这把锁之后才能从队列里拿到数据,你可以把这段代码替换之后再运行一下,这次的结果应该是正确的了。

可能你觉得奇怪,我们使用lock_guard创建了一个ltx对象,但是并没有地方去释放这把锁,如果你有这样的疑问应该是对C++11还不是很熟悉,在C++11标准中,因为有了RAII的缘故,一个对象被销毁时一定会执行析构函数,就算是运行过程中对象产生异常析构函数也会执行,在C++中这叫栈展开。有了这个特性之后,lock_guard就不难理解了,其构造函数其实就是调用了mutex的lock方法,然后把这个mutex保存在成员变量中,当对象销毁时析构函数中调用unlock。所以,有了这个机制之后,我们就不用到处写unlock了,这也是我觉得C++更好用了的原因之一。

在C++中同样遵循大括号作用域,在上面的代码中,lock_guard是在if语句中的,当if语句执行完之后,ltx就被销毁了,所以当循环进入到下一次的时候实际上锁已经被释放了。

这样我们就解决了队列的线程安全问题,但眼尖的你一定看出来其实还有一个问题,threadFunc函数中的死循环一直在空转,这显然是有问题的。解决这个问题最容易想到的就是每次循环都sleep一下,但这显然也是有问题的,对于一个有实时要求的系统是不能接受的。

所以,我们迫切需要一种机制,让threadFunc没事干的时候就停在那等通知,想想看什么技术可以实现?对,就是cond,在C++11中条件变量也被封装在了std::命名空间中。下面我们就使用cond来改造一下,相关代码如下:

 

std::mutex mtx;
std::condition_variable cond;   // v2


void threadFunc(std::queue& q) {
    while (true) {
        std::unique_lock ltx(mtx);       // v2
        cond.wait(ltx, [q]() { return !q.empty();}); // v2


        int param = q.front();
        q.pop();
        std::cout << "param:" << param << std::endl;
    }
}


void jobDispatch(std::queue& q) {
    for (int i = 0; i < 1000; i++) {
        q.push(i);
    }
    cond.notify_all();  // v2
}

 

修改后的代码我在后面都加了注释(v2), 首先我们定义了一个全局的条件变量cond,然后在threadFunc中调用cond的wait方法等待通知。然后在jobDispatch中往队列里面写完数据之后调用notify_all方法通知所有等待的线程。这样,当队列中没有数据的时候线程池中的线程就会进入睡眠,直到notify_all被调用。这里你可以想一下,上面notify_all还可以进一步优化吗?

当然,上面还作了一个调整,就是将原来的lock_guard换了unique_lock,这个改动是必须的,我们知道cond调用wait的时候会尝试释放锁,可lock_guard里面没有释放锁的方法,而unique_lock是有unlock方法的。也就是说,unique_lock创建的ltx对象可以手动调用unlock方法释放锁。

好了,到这里其实我们已经写出一个简单的线程池了,这个线程池通过一个队列传递参数,使用mutex解决线程安全问题,cond解决性能问题。看起来已经和上面Golang的逻辑非常接近了。如果你使用Golang写过代码,并且上面C++的代码你也尝试写出来了,你就会惊叹于Golang简单了。好了,这里不吹Golang了,我们继续回到C++上来。

当然,到这里还远远没完呢,C++的看家本领是啥?对,是面向对象编程。上面的代码很显然没有面向对象的味道。下面我们就使用面向对象的思想来实现一个线程池。这里直接给出代码

 

#include 
#include 
#include 
#include 


class TPool {
public:
    TPool(): m_thread_size(1), m_terminate(false) {};
    ~TPool() { stop(); }
    // 线程池初始化
    bool init(size_t size);
    // 停止所有线程
    void stop();
    // 启动线程池的各个线程
    void start();
    // 任务执行入口
    template 
    auto exec(F&& f, A&&... args)->std::future;
    // 等待所有线程执行完成
    bool waitDone();


private:
    // 每个任务都是一个struct结构体,方便未来扩展
    struct Task {
        Task() {}
        std::function m_func;
    };
    // Task未来在队列中以智能指针的形式传递
    typedef std::shared_ptr TaskFunc;


private:
    // 尝试从任务队列获取一个任务
    bool get(TaskFunc &t);
    // 线程主函数
    bool run();


private:
    // 任务队列,将Task直接放到队列中
    std::queue m_tasks;
    // 线程池
    std::vector m_threads;
    // 锁
    std::mutex m_mutex;
    // 用于线程之间通知的条件变量
    std::condition_variable m_cond;
    // 线程池大小
    size_t m_thread_size;
    // 标记线程是否结束
    bool m_terminate;
    // 用来记录状态的原子变量
    std::atomic m_atomic{0};
};

 

我们定义了一个TPool类,这个类里面包含几个部分,我们从下往上看。第一个部分是线程池管理相关的各种资源,每一个我都写了注释。第二部分是任务相关的操作,这部分不对外开放。第三部分是任务定义,使用struct声明了一个Task,其中有一个空的构造函数,还声明了一个m_func,这是最终task执行的入口。最后一部分是线程池对外开放的各种接口。用户使用线程的大致流程如下:

线程池

这里我将线程的初始化和线程的启动分成了两步,是希望在使用的时候精确知道是哪一步出了问题,如果你觉得这样太繁琐,可以适当减少步骤,比如将init和start方法进行合并。

下面我们就来详细讲一下线程各个方法的实现。首先是init和start方法,init方法用来初始化线程,代码如下:

 

bool init(size_t size) {
    unique_lock lock(m_mutex);
    if (!m_threads.empty()) {
        return false;
    }
    m_thread_size = size;
    return true;
}

 

传入一个size,表示线程池的大小,上来就加锁,这是为了防止在多线程的情景下执行init,这个方法实际上只做了一件事,就是设置线程池的大小。

初始化完了之后,调用start方法启动线程池,start方法的代码如下:

 

bool start() {
    unique_lock lock(m_mutex);
    if (!m_threads.empty()) {
        return false;
    }


    for (size_t i = 0; i < m_thread_size; i++) {
        m_threads.push_back(new thread(&TPool::run, this));
    }
    return true;
}
同样,为了防止多线程语境,上来也是加了一把锁,如果m_threads不为空说明已经初始化过了,直接就返回了。接着就创建线程放到m_threads这个vector中,线程的主函数是当前类的run方法。这样,所有线程的主函数都跑起来了,接下来我们看一下线程主函数的代码,如下:
void run() {
    while(!m_terminate) {
        TaskFunc task;
        bool ok = get(task);
        if (ok) {
            ++m_atomic;


            task->m_func();


            --m_atomic;


            unique_lock lock(m_mutex);
            if (m_atomic == 0 && m_tasks.empty()) { // 是否所有任务都执行完成
                m_cond.notify_all();
            }
        }
    }
}

 

不出所料,run方法里其实就是一个死循环,这个循环上来就判断是否结束了,如果已经结束就退出当前循环,run方法返回,当前线程结束。

如果没有结束,就调用get方法从任务队列里取一个任务出来执行,这里使用一个原子变量来判断最后是不是所有任务都执行完成,这个原子变量不是必须的,你可以根据你自己的场景做相应的修改。取到任务之后,就会调用任务的m_func方法,还记得这个方法吗?它定义在Task结构体中。最后会判断是否所有任务都结束了,如果已经结束了会通知其它线程。

这里我们来看一下get方法是怎么获取任务的,get方法的代码如下:

 

bool get(TaskFunc &t) {
    unique_lock lock(m_mutex);
    if (m_tasks.empty()) {
        m_cond.wait(lock, [this]{return m_terminate || !m_tasks.empty();});
    }


    if (m_terminate)
        return false;


    t = std::move(m_tasks.front());
    m_tasks.pop();
    return true;
}

 

上来首先加了一把锁,如果任务队列没有任务可以执行,使用条件变量m_cond调用wait方法等待。

然后,如果此时线程已经被结束掉了,直接返回false,如果没有结束,就从队列中取出一个任务,赋值给传进来的t。注意,这里使用的是参数传值的方式。这样就实现了任务的传递,当没有任务的时候m_cond.wait会让当前进程进入睡眠,等待通知。

接下来,我们看一下任务是如何被投递到任务队列中的,用来投递任务的方法是exec,代码如下:

 

template 
auto exec(F&& f, A&&... args)->future {
    using retType = decltype(f(args...));
    auto task = make_shared>(bind(std::forward(f), std::forward(args)...));
    TaskFunc fPtr = make_shared();
    fPtr->m_func = [task](){
        (*task)();
    };


    unique_lock lock(m_mutex);
    m_tasks.push(fPtr);
    m_cond.notify_one();


    return task->get_future();
}

 

exec方法稍微有一点复杂,知识点非常密集,我们简单过一下逻辑。首先,我们将exec方法声明成了模板函数,有两个参数,F表示任务最终执行的方法,A是一个可变参数,实际就是传给函数f的参数,返回值只有在运行的时候才会知道,所以这里使用了自动类型推导,并且配合了decltype关键字,->future这句的意思是最终会返回一个future,这个future的类型是由decltype推导出f函数返回值的类型,这里有点绕,如果看不明白的话还是得去看一下future和decltype是怎么回事。

进入函数内部,我们一行一行讲,首先是

 

using retType = decltype(f(args...));
decltype用于查询表达的类型,这里的语义表达的就是f(args…)这个表达式最终返回的类型。接着,下一行是创建一个task,这个task是一个智能指针

auto task = make_shared>(bind(std::forward(f), std::forward(args)...));

 

首先,最外层make_shared是创建一个智能指针这没什么可说的。这里的std::packaged_task会根据前面推导出的类型创建出一个future对象,后面的bind是将这个函数和后面的可变参数绑定起来。这样在函数调用的时候就可以获取到参数了。

接着是创建Task类型的智能指针,并将刚刚创建好的函数放到Task结构中的m_func中

 

TaskFunc fPtr = make_shared();
fPtr->m_func = [task](){
    (*task)();
};

 

上面用了一个Lambda表达式创建一个函数,并将这个函数赋值给了m_func,最终任务执行的其实就是这个Lambda表达式函数,在这个函数中才最终调用传进来的方法。此时,fPtr实际上就是一个Task对象,我们在类中重命名成了TaskFunc。接着将这个Task放到队列中,注意要加锁。最后将future对象返回出去。这意味着我们调用exec方法之后可以得到一个future对象。

exec方法是整个线程池中最复杂的部分了,涉及到很多C++的知识,后面有时间我会专门开几篇文章单独深入的去剖析这部分内容。

最后,我们来看一下其它的几个方法,首先是线程的停止,如下:

 

void stop() {
    {
        unique_lock lock(m_mutex);
        m_terminate = true;
        m_cond.notify_all();
    }


    for (auto & m_thread : m_threads) {
        if (m_thread->joinable()) {
            m_thread->join();
        }
        delete m_thread;
        m_thread = nullptr;
    }


    unique_lock lock(m_mutex);
    m_threads.clear();
}

 

这里我们使用了一对大括号将部分代码包起来了,这种用法其实是为了让锁更早的释放,unique_lock出了大括号就会被销毁,从而调用析构函数进而释放锁。接着是等待各个线程结束,其实就是将m_terminate置为true,run里面的死循环跳出循环,线程主函数返回。然后是清除各种资源。

最后我们实际用一下这个线程池,代码如下:

 

#include "thread-pool.hpp"
#include 


using namespace std;


void threadFunc(int a) {
   cout << "a=" << a << endl;
}


class A {
public:
    A() = default;
    int run(int a, int b) {
        return a + b;
    }
};


int main() {
    TPool p1;
    p1.init(10);
    p1.start();
    p1.exec(threadFunc, 100);
    p1.exec(threadFunc, 200);


    A a1;
    auto fu1 = p1.exec(std::bind(&A::run, &a1, std::_1, std::_2), 10, 20);
    int ret = fu1.get();
    std::cout << "res:" << ret << std::endl;


    p1.waitDone();
    return 0;
}

 

可以看到,除了使用方法外,我们还可以使用一个类方法作为线程的主函数,当然,主函数是一个模板函数,你可以传任意的类型,好,到这里我整个线程池就实现完了。

总结

这篇文章我们使用C++11新特性实现了一个通用的线程池,我们先是使用Golang写了一个简单的协程池,然后顺着相同的思路通过队列传参的形式实现了一个初级版本,但还没有结束,因为C++是支持面向对象编程的,所以我们又使用面向对象的方式实现了最终的版本。

当然,上面只是线程池实现的其中一种方式。并且很多C++相关的新特性也没有提到,比如thread_local,这部分内容还是需要你自己去探索了。






审核编辑:刘清

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分