线程池的应用

描述

线程池的应用

在我认知中,任何网络服务器都是一个死循环。这个死循环长下面这个样子。

数据库

基本上服务器框架都是基于这个架构而不断开发拓展的。

这个死循环总共分为四个步骤,可以涵盖所有客户端的需求,然而目前绝大多数企业不会用这样的架构。

问题在于容易产生阻塞。

作为客户端,我们当然希望访问服务器的时候,能够在短时间内收到回复,意味着自己连接上了该服务器。但是上述架构却很容易产生响应延迟。

当某一个连接的2.3.4时间过长,也许是因为客户上传了很大的数据,也许是因为业务处理起来比较麻烦,需要计算很多东西,也许是客户需要下载很大的东西,总之只要2,3,4的时间延迟,意味着下一个循环处理其它连接的动作也会被无限延迟。

也就是说,系统需要处理一个很慢的客户端的连接,后面的所有连接,哪怕只是耗时很短的任务,都需要等这个很慢的任务完成才能进行。

所以除了像redis的服务器,数据库都是基于hash的key-value结构,业务处理起来十分快速,才会将1,2,3,4都在一个线程中完成,其它的服务器若要提供千万乃至亿级别的客户接入量,必须更快地处理客户的连接,解除1和234之间的耦合,这才引入了多线程,我的主线程只负责1,然后将2,3,4分发到其它线程中执行。

然而,如果服务器选择这种多线程架构,当我们面临着巨大的客户端流量,则势必需要频繁地创建和销毁线程,这个过程十分浪费系统资源,还容易造成系统崩溃,然后老板震惊,被迫毕业,流落街头,思之令人发笑。

解决办法就是线程池。

我们预先创建好一系列线程,就好比后宫佳丽三千,然后皇上(线程池中枢)来了兴致(收到任务),就去翻一个妃子(线程池中某个线程)的牌子。妃子(线程)解决完需求后,回到后宫(线程池),等待下一次召唤。

不用创建和销毁,而是回收利用,所有池式结构都可以看做是一种对资源调度的缓冲,这就是线程池的精髓。

线程池设计

我们手撕线程池,目的还是搞懂基本原理,不弄太多花里胡哨的架构,比如工厂模式之类的。

当前这个版本的线程池是基于互斥锁和条件变量实现的。

预告(画饼):无锁线程池后续也会手撕。

线程池总体上可以分为三大组件。

  • 任务队列(存还没有执行的任务)
  • 执行队列(可以看成就是线程池,存放着可以用来执行任务的线程)
  • 线程池管理中枢(负责封装前两个类,任务的分发,线程池的创建,销毁,等等。对外提供统一的接口)

其工作流程大概如图所示

数据库

任务队列节点数据结构

//- 任务队列元素
class TaskEle{
public:
void (*taskCallback)(void *arg);
string user_data;
void setFunc(void (*tcb)(void *arg)){
taskCallback = tcb;
}
};

任务队列负责存还没有执行的业务,我们可以将每个业务都抽象成一个函数,每个函数自然有可能需要参数。

所以任务队列的节点需要两个成员:

  • taskCallback:函数回调,执行客户端想要的业务。
  • user_data:函数参数,包含客户端的信息,比如socketfd等。

顺便提供了一个接口,可以修改回调函数。

执行队列节点数据结构

//-执行队列元素
class ExecEle{
public:
pthread_t tid;
bool usable = true;
ThreadPool* pool;
static void* start(void* arg);
};
  • tid:每个节点都对应一个线程,所以需要一个id成员来存线程id,
  • usable:这个成员非常妙,它代表当前线程是否可用,默认为true,一旦设置为false,则该线程会结束。
  • 使用usable可以在最后销毁线程池的时候,以一种优雅的方式结束每个线程,而代替pthread_cancel这种强制销毁线程的方式,因为你不知道线程中的任务是否处理完,强制销毁就会使某些业务中断。
  • pool:这个成员是指向中枢管理(后面会讲)的指针,主要是为了在每个线程中通过pool获取到一个全局(对于所有线程池线程共享)的互斥锁和条件变量。
  • start:是线程池对象执行的一个实现线程再回收利用的任务循环。具体实现代码也是在后面会讲。

线程池管理中枢设计

总体结构:

class ThreadPool{
public:
//-任务队列和执行队列
deque task_queue;
deque exec_queue;
//-条件变量
pthread_cond_t cont;
//-互斥锁
pthread_mutex_t mutex;
//-线程池大小
int thread_count;
//-构造函数
ThreadPool(int thread_count):thread_count(thread_count);
//-创建线程池
void createPool();
//-加入任务
void push_task(void(*tcb)(void* arg),int i);
//-利用析构销毁线程池
~ThreadPool();
};*>*>

关于数据成员:

  • task_queue、exec_queue: 任务队列和执行队列,我使用deque作为容器实现队列。
  • cont:所有线程共享的条件变量。
  • mutex:所有线程共享的互斥锁。
  • thread_count: 线程池创建的时候,初始大小

关于成员方法:

  • ThreadPool:构造函数
  • createPool:创建线程池
  • push_task: 给服务器主循环用的,给线程池添加任务。
  • ~ ThreadPool: 销毁线程池,事实上应该单独定义一个destroyPool的api,我这里为了简便合并到析构中了。

ExecEle的start函数实现

现在对于ThreadPool对象有概念以后,可以先将刚刚执行队列节点ExecEle的start函数实现,其代表了每个线程池的线程始终在跑的循环,在无任务分配的时候阻塞在某个位置。

void* ExecEle::start(void*arg){
//-获得执行对象
ExecEle *ee = (ExecEle*)arg;
while(true){
//-加锁
pthread_mutex_lock(&(ee->pool->mutex));
while(ee->pool->task_queue.empty()){//-如果任务队列为空,等待新任务
if(!ee->usable){
break;
}
pthread_cond_wait(&ee->pool->cont, &ee->pool->mutex);
}
if(!ee -> usable){
pthread_mutex_unlock(&ee -> pool -> mutex);
break;
}
TaskEle *te = ee->pool->task_queue.front();
ee->pool->task_queue.pop_front();
//-解锁
pthread_mutex_unlock(&(ee->pool -> mutex));
//-执行任务回调
te->user_data+=to_string(pthread_self());
te->taskCallback(te);
}
//-删除线程执行对象
delete ee;
fprintf(stdout,"destroy thread %dn",pthread_self());
return NULL;
}

arg参数指向的是该线程函数对应的执行元素ExecEle本身的指针,我们定义其为ee。然后进入死循环,通过ee,我们可以获得线程池中枢对象pool。

通过pool。我们可以获得任务队列的情况,当任务队列为空,则线程进入阻塞状态,等待任务队列有任务进来后,通过条件变量通知,再恢复执行。

恢复执行后,从任务队列中取出队首的任务,这个过程需要在mutex的范围内,保证独占性。

之后解除互斥锁,开始执行任务的回调。执行完进行入下个循环,尝试再次获得互斥锁。

最后说一说usable,当我们销毁线程池的时候,设置每一个线程的usable为false,那么不会立刻中断每个线程正在执行的回调,而是等回调结束后,在下一次循环中如果检测到usable为false后,就会退出整个大循环,并释放自己的锁,唤醒线程池其它休眠的线程。退出大循环后,线程自然而优雅地结束。

之后是ThreadPool自己的api实现

构造函数ThreadPool实现:

ThreadPool(int thread_count):thread_count(thread_count){
//-初始化条件变量和互斥锁
pthread_cond_init(&cont,NULL);
pthread_mutex_init(&mutex,NULL);
}

主要为了初始化cont,mutex,thread_count。

创建线程池createPool实现:

void createPool(){
int ret;
//-初始执行队列
for(int i = 0;i ExecEle *ee = new ExecEle;
ee->pool = const_cast(this);
if(ret = pthread_create(&(ee->tid),NULL,ee->start,ee)){
delete ee;
ERR_EXIT_THREAD(ret,"pthread_create");
}
fprintf(stdout,"create thread %dn",i);
exec_queue.push_back(ee);
}
fprintf(stdout,"create pool finish...n");
}*>;++i){

通过pthread_create创建thread_count个线程,每个线程执行自己的start函数进入等待任务循环,并阻塞在锁和条件变量的位置。将exec对象push进执行队列。

添加任务 push_task实现:

void push_task(void(*tcb)(void* arg),int i){
TaskEle *te = new TaskEle;
te->setFunc(tcb);
te->user_data = "Task "+to_string(i)+" run in thread ";
//-加锁
pthread_mutex_lock(&mutex);
task_queue.push_back(te);
//-通知执行队列中的一个进行任务
pthread_cond_signal(&cont);
//-解锁
pthread_mutex_unlock(&mutex);
}

主要功能是构造TaskEle对象并加入到执行队列中。

每个TaskEle可能需要执行不同的业务,所以push_task需要传入对应业务的回调tcb(task callback)

i是我加的额外参数,代表主线程中连接的客户端编号,其意义可以是socketfd。

注意在修改执行队列(push)的时候,需要加锁保证独占。

销毁线程池~ ThreadPool 实现:

~ThreadPool() {
for(int i = 0;i exec_queue[i]->usable = false;
}
pthread_mutex_lock(&mutex);
//-清空任务队列
task_queue.clear();
//-广播给每个执行线程令其退出(执行线程破开循环会free掉堆内存)
pthread_cond_broadcast(&cont);
pthread_mutex_unlock(&mutex);//-让其他线程拿到锁
//-等待所有线程退出
for(int i = 0;i pthread_join(exec_queue[i] -> tid,NULL);
}
//-清空执行队列
exec_queue.clear();
//-销毁锁和条件变量
pthread_cond_destroy(&cont);
pthread_mutex_destroy(&mutex);
}();>();++i){

先将所有线程的usable设置为false,之后加锁,清空任务队列,并通过条件变量通知所有线程,等所有线程退出后,销毁执行队列,销毁锁和条件变量。

业务代码和服务器主循环

//-线程执行的业务函数
void execFunc(void* arg){
TaskEle *te =(TaskEle*)arg;
fprintf(stdout, "%sn",te->user_data.c_str());
};

int main(){
//-创建线程池
ThreadPool pool(100);
pool.createPool();
//-创建任务
for(int i =0;i<1000;++i){
pool.push_task(&execFunc,i);
}
exit(EXIT_SUCCESS);
}

随便写的线程执行的业务,打印一下客户信息。

主线程创建100大小的线程池,并添加1000个任务(连接)。

完整代码

// *C++和posix接口实现一个线程池
//-三个组件:任务队列,执行队列,线程池(中枢管理)

#include
#include
#include
#include
#include
#include
#include
#include
#include

using namespace std;

//-打印线程错误专用,根据err来识别错误信息
static inline void ERR_EXIT_THREAD(int err, const char * msg){
fprintf(stderr,"%s:%sn",strerror(err),msg);
exit(EXIT_FAILURE);
}

class ThreadPool;//-声明

//- 任务队列元素
class TaskEle{
public:
void (*taskCallback)(void *arg);
string user_data;
void setFunc(void (*tcb)(void *arg)){
taskCallback = tcb;
}
};

//-执行队列元素
class ExecEle{
public:
pthread_t tid;
bool usable = true;
ThreadPool* pool;
static void* start(void* arg);
};

//-线程池
class ThreadPool{
public:
//-任务队列和执行队列
deque task_queue;
deque exec_queue;
//-条件变量
pthread_cond_t cont;
//-互斥锁
pthread_mutex_t mutex;
//-线程池大小
int thread_count;
//-构造函数
ThreadPool(int thread_count):thread_count(thread_count){
//-初始化条件变量和互斥锁
pthread_cond_init(&cont,NULL);
pthread_mutex_init(&mutex,NULL);
}
void createPool(){
int ret;
//-初始执行队列
for(int i = 0;i ExecEle *ee = new ExecEle;
ee->pool = const_cast(this);
if(ret = pthread_create(&(ee->tid),NULL,ee->start,ee)){
delete ee;
ERR_EXIT_THREAD(ret,"pthread_create");
}
fprintf(stdout,"create thread %dn",i);
exec_queue.push_back(ee);
}
fprintf(stdout,"create pool finish...n");
}
//-加入任务
void push_task(void(*tcb)(void* arg),int i){
TaskEle *te = new TaskEle;
te->setFunc(tcb);
te->user_data = "Task "+to_string(i)+" run in thread ";
//-加锁
pthread_mutex_lock(&mutex);
task_queue.push_back(te);
//-通知执行队列中的一个进行任务
pthread_cond_signal(&cont);
//-解锁
pthread_mutex_unlock(&mutex);

}
//-销毁线程池
~ThreadPool() {
for(int i = 0;i exec_queue[i]->usable = false;
}
pthread_mutex_lock(&mutex);
//-清空任务队列
task_queue.clear();
//-广播给每个执行线程令其退出(执行线程破开循环会free掉堆内存)
pthread_cond_broadcast(&cont);
pthread_mutex_unlock(&mutex);//-让其他线程拿到锁
//-等待所有线程退出
for(int i = 0;i pthread_join(exec_queue[i] -> tid,NULL);
}
//-清空执行队列
exec_queue.clear();
//-销毁锁和条件变量
pthread_cond_destroy(&cont);
pthread_mutex_destroy(&mutex);
}
};

void* ExecEle::start(void*arg){
//-获得执行对象
ExecEle *ee = (ExecEle*)arg;
while(true){
//-加锁
pthread_mutex_lock(&(ee->pool->mutex));
while(ee->pool->task_queue.empty()){//-如果任务队列为空,等待新任务
if(!ee->usable){
break;
}
pthread_cond_wait(&ee->pool->cont, &ee->pool->mutex);
}
if(!ee -> usable){
pthread_mutex_unlock(&ee -> pool -> mutex);
break;
}
TaskEle *te = ee->pool->task_queue.front();
ee->pool->task_queue.pop_front();
//-解锁
pthread_mutex_unlock(&(ee->pool -> mutex));
//-执行任务回调
te->user_data+=to_string(pthread_self());
te->taskCallback(te);
}
//-删除线程执行对象
delete ee;
fprintf(stdout,"destroy thread %dn",pthread_self());
return NULL;
}


//-线程执行的业务函数
void execFunc(void* arg){
TaskEle *te =(TaskEle*)arg;
fprintf(stdout, "%sn",te->user_data.c_str());
};

int main(){
//-创建线程池
ThreadPool pool(100);
pool.createPool();
//-创建任务
for(int i =0;i<1000;++i){
pool.push_task(&execFunc,i);
}
exit(EXIT_SUCCESS);
}();>();++i){
*>;++i){
*>*>
打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分