线程池,它是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。
在一个系统中,线程数过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。线程数过多会导致额外的线程切换开销。
线程的创建-销毁对系统性能影响很大:
线程池的应用场景:
线程池的应用例子:
作者最近在开发的过程中,也遇到多线程编程问题,跨平台,并发任务多,执行周期短。如果按照以往的反复的创建/销毁线程,显然不是一个很好的软件设计。我们需要利用线程池的方式来解决我们问题。
TP组件,又称线程池组件。是作者编写一个多线程管理组件,特点:
为了实现跨平台,需要将差异性接口抽象出来,我们整个组件需要抽象几个内容:①日志接口;②内存管理接口;③ 线程接口;④互斥量接口;⑤信号量接口。以CMSIS接口为例的实现:
typedef enum{
TP_EOK = 0, // There is no error
TP_ERROR, // A generic error happens
TP_ENOMEM, // No memory
TP_EINVAL, // Invalid argument
} TpErrCode;
#define TP_PRINT printf
#define TP_LOGE(...) TP_PRINT("33[31;22m[E/TP](%s:%d) ", __FUNCTION__, __LINE__);
TP_PRINT(__VA_ARGS__);
TP_PRINT("33[0mn")
#define TP_LOGI(...) TP_PRINT("33[32;22m[I/TP](%s:%d) ", __FUNCTION__, __LINE__);
TP_PRINT(__VA_ARGS__);
TP_PRINT("33[0mn")
#define TP_LOGD(...) TP_PRINT("[D/TP](%s:%d) ", __FUNCTION__, __LINE__);
TP_PRINT(__VA_ARGS__);
TP_PRINT("n")
#define TP_MALLOC malloc
#define TP_FREE free
// tp_def.h
typedef void *TpThreadId;
typedef void *(*tpThreadFunc)(void *argv);
typedef struct {
char *name;
uint32_t stackSize;
uint32_t priority : 8;
uint32_t reserver : 24;
} TpThreadAttr;
TpThreadId TpThreadCreate(tpThreadFunc func, void *argv, const TpThreadAttr *attr);
void TpThreadDelete(TpThreadId thread);
「参数」 | 「说明」 |
---|---|
func | 线程入口函数 |
argv | 线程入口函数参数 |
attr | 线程属性:线程名,栈空间,优先级 |
「返回」 | -- |
NULL | 创建失败 |
线程句柄 | 创建成功 |
「参数」 | 「说明」 |
---|---|
thread | 线程句柄 |
// tp_threa_adapter.c
#include "tp_def.h"
#include "cmsis_os2.h"
TpThreadId TpThreadCreate(tpThreadFunc func, void *argv, const TpThreadAttr *attr)
{
osThreadId_t thread = NULL;
osThreadAttr_t taskAttr = {
.name = attr->name,
.attr_bits = 0,
.cb_mem = NULL,
.cb_size = 0,
.stack_mem = NULL,
.stack_size = attr->stackSize,
.priority = (osPriority_t)attr->priority,
.tz_module = 0,
.reserved = 0,
};
thread = osThreadNew((osThreadFunc_t)func, argv, &taskAttr);
return (TpThreadId)thread;
}
void TpThreadDelete(TpThreadId thread)
{
if(thread != NULL) {
osThreadTerminate(thread);
}
}
// tp_def.h
typedef void *TpMutexId;
TpMutexId TpMutexCreate(void);
TpErrCode TpMutexLock(TpMutexId mutex);
TpErrCode TpMutexUnlock(TpMutexId mutex);
void TpMutexDelete(TpMutexId mutex);
「参数」 | 「说明」 |
---|---|
「返回」 | -- |
NULL | 创建失败 |
互斥量句柄 | 创建成功 |
「参数」 | 「说明」 |
---|---|
mutex | 互斥量句柄 |
「返回」 | -- |
TP_EINVAL | mutex无效参数 |
TP_ERROR | 获取互斥量失败 |
TP_EOK | 成功获取互斥量 |
「参数」 | 「说明」 |
---|---|
mutex | 互斥量句柄 |
「返回」 | -- |
TP_EINVAL | mutex无效参数 |
TP_ERROR | 释放互斥量失败 |
TP_EOK | 成功释放互斥量 |
「参数」 | 「说明」 |
---|---|
mutex | 互斥量句柄 |
// tp_mutex_adapter.c
#include "tp_def.h"
#include "cmsis_os2.h"
TpMutexId TpMutexCreate(void)
{
osMutexId_t mutex = NULL;
mutex = osMutexNew(NULL);
return (TpMutexId)mutex;
}
TpErrCode TpMutexLock(TpMutexId mutex)
{
if (mutex == NULL) {
return TP_EINVAL;
}
if(osMutexAcquire((osMutexId_t)mutex, osWaitForever) == osOK) {
return TP_EOK;
}
return TP_ERROR;
}
TpErrCode TpMutexUnlock(TpMutexId mutex)
{
if (mutex == NULL) {
return TP_EINVAL;
}
if(osMutexRelease((osMutexId_t)mutex) == osOK) {
return TP_EOK;
}
return TP_ERROR;
}
void TpMutexDelete(TpMutexId mutex)
{
if (mutex == NULL) {
return;
}
osMutexDelete(mutex);
}
// tp_def.h
typedef void *TpSemId;
TpSemId TpSemCreate(uint32_t value);
TpErrCode TpSemAcquire(TpSemId sem);
TpErrCode TpSemRelease(TpSemId sem);
void TpSemDelete(TpSemId sem);
「参数」 | 「说明」 |
---|---|
「返回」 | -- |
NULL | 创建失败 |
信号量句柄 | 创建成功 |
「参数」 | 「说明」 |
---|---|
sem | 信号量句柄 |
「返回」 | -- |
TP_EINVAL | sem无效参数 |
TP_ERROR | 获取信号量失败 |
TP_EOK | 成功获取信号量 |
「参数」 | 「说明」 |
---|---|
sem | 信号量句柄 |
「返回」 | -- |
TP_EINVAL | 信号量无效参数 |
TP_ERROR | 释放信号量失败 |
TP_EOK | 成功释放信号量 |
「参数」 | 「说明」 |
---|---|
sem | 信号量句柄 |
// tp_sem_adapter.c
#include "tp_def.h"
#include "cmsis_os2.h"
TpSemId TpSemCreate(uint32_t value)
{
osSemaphoreId_t sem = NULL;
sem = osSemaphoreNew(1, value, NULL);
return (TpSemId)sem;
}
TpErrCode TpSemAcquire(TpSemId sem)
{
if (sem == NULL) {
return TP_EINVAL;
}
if(osSemaphoreAcquire((osSemaphoreId_t)sem, osWaitForever) != osOK) {
return TP_ERROR;
}
return TP_EOK;
}
TpErrCode TpSemRelease(TpSemId sem)
{
if (sem == NULL) {
return TP_EINVAL;
}
if(osSemaphoreRelease((osSemaphoreId_t)sem) != osOK) {
return TP_ERROR;
}
return TP_EOK;
}
void TpSemDelete(TpSemId sem)
{
if (sem == NULL) {
return;
}
osSemaphoreDelete((osSemaphoreId_t)sem);
}
tp的提供的接口非常精简:创建线程池,增加任务到线程池,销毁线程池。
「参数」 | 「说明」 |
---|---|
pool | 线程池句柄 |
name | 线程池中线程名字 |
stackSize | 线程池中线程的栈大小 |
theadNum | 线程池中线程数目 |
「返回」 | -- |
TP_EINVAL | pool无效参数 |
TP_ERROR | 创建失败 |
TP_NOMEM | 内存不足 |
TP_EOK | 创建成功 |
TpErrCode TpCreate(Tp *pool, const char *name,
uint32_t stackSize, uint8_t threadNum)
{
int index = 0;
if(pool == NULL) {
TP_LOGE("Thread pool handle is NULL");
return TP_EINVAL;
}
// ①
if((pool->queueLock = TpMutexCreate()) == NULL) {
TP_LOGE("Create thread pool mutex failed");
return TP_ERROR;
}
// ②
if((pool->queueReady = TpSemCreate(0)) == NULL) {
TP_LOGE("Create thread pool sem failed");
return TP_ERROR;
}
pool->taskQueue = NULL;
pool->threadNum = threadNum;
pool->waitTaskNum = 0;
pool->threads = (TpThreadInfo *)TP_MALLOC(threadNum * sizeof(TpThreadInfo));
if(pool->threads == NULL) {
TP_LOGE("Malloc thread pool info memory failed");
return TP_ENOMEM;
}
// ③
for(index = 0; index < threadNum; index++) {
pool->threads[index].attr.name = (char *)TP_MALLOC(TP_THREAD_NAME_LEN);
if(pool->threads[index].attr.name == NULL) {
TP_LOGE("Malloc thread name memory failed");
return TP_ENOMEM;
}
snprintf(pool->threads[index].attr.name, TP_THREAD_NAME_LEN, "%s%d", name, index);
pool->threads[index].attr.stackSize = stackSize;
pool->threads[index].attr.priority = TP_THREAD_PRIORITY;
pool->threads[index].threadId = TpThreadCreate(TpThreadHandler, pool, &pool->threads[index].attr);
}
return TP_EOK;
}
「参数」 | 「说明」 |
---|---|
pool | 线程池句柄 |
handle | 线程池中线程名字 |
argv | 线程池中线程的栈大小 |
「返回」 | -- |
TP_EINVAL | pool无效参数 |
TP_NOMEM | 内存不足 |
TP_EOK | 增加task成功 |
TpErrCode TpAddTask(Tp *pool, taskHandle handle, void *argv)
{
TpTask *newTask = NULL;
TpTask *taskLIst = NULL;
if(pool == NULL) {
TP_LOGE("Thread pool handle is NULL");
return TP_EINVAL;
}
// ①
newTask = (TpTask *)TP_MALLOC(sizeof(TpTask));
if(newTask == NULL) {
TP_LOGE("Malloc new task handle memory failed");
return TP_ENOMEM;
}
newTask->handle = handle;
newTask->argv = argv;
newTask->next = NULL;
// ②
TpMutexLock(pool->queueLock);
taskLIst = pool->taskQueue;
if(taskLIst == NULL) {
pool->taskQueue = newTask;
}
else {
while(taskLIst->next != NULL) {
taskLIst = taskLIst->next;
}
taskLIst->next = newTask;
}
pool->waitTaskNum++;
TpMutexUnlock(pool->queueLock);
// ③
TpSemRelease(pool->queueReady);
return TP_EOK;
}
「参数」 | 「说明」 |
---|---|
pool | 线程池句柄 |
「返回」 | -- |
TP_EINVAL | pool无效参数 |
TP_EOK | 销毁成功 |
TpErrCode TpDestroy(Tp *pool)
{
int index = 0;
TpTask *head = NULL;
if(pool == NULL) {
TP_LOGE("Thread pool handle is NULL");
return TP_EINVAL;
}
// ①
for(index = 0; index < pool->threadNum; index++) {
TpThreadDelete(pool->threads[index].threadId);
pool->threads[index].threadId = NULL;
TP_FREE(pool->threads[index].attr.name);
pool->threads[index].attr.name = NULL;
}
// ②
TpMutexDelete(pool->queueLock);
pool->queueLock = NULL;
TpSemDelete(pool->queueReady);
pool->queueReady = NULL;
TP_FREE(pool->threads);
pool->threads = NULL;
// ③
while (pool->taskQueue != NULL) {
head = pool->taskQueue;
pool->taskQueue = pool->taskQueue->next;
TP_FREE(head);
}
pool = NULL;
return TP_EOK;
}
「参数」 | 「说明」 |
---|---|
argv | 线程池参数 |
static void *TpThreadHandler(void *argv)
{
Tp *pool = (Tp *)argv;
TpTask *task = NULL;
while(1) {
// ①
TpMutexLock(pool->queueLock);
// ②
while(pool->waitTaskNum == 0) {
TpMutexUnlock(pool->queueLock);
TpSemAcquire(pool->queueReady);
TpMutexLock(pool->queueLock);
}
// ③
task = pool->taskQueue;
pool->waitTaskNum--;
pool->taskQueue = task->next;
TpMutexUnlock(pool->queueLock);
task->handle(task->argv);
// ④
TP_FREE(task);
task = NULL;
}
}
#include "tp_manage.h"
Tp pool;
void TestTaskHandle(void *argv)
{
printf("%s--taskId: %drn", __FUNCTION__, (uint32_t)argv);
}
int main(void)
{
// ①
TpCreate(&pool, "tp", 1024, 3);
// ②
TpAddTask(&pool, TestTaskHandle, (void *)1);
TpAddTask(&pool, TestTaskHandle, (void *)2);
TpAddTask(&pool, TestTaskHandle, (void *)3);
TpAddTask(&pool, TestTaskHandle, (void *)4);
TpAddTask(&pool, TestTaskHandle, (void *)5);
TpAddTask(&pool, TestTaskHandle, (void *)6);
return 0;
}
[TP组件](https://gitee.com/RiceChen0/tp.git)
欢迎关注微信公众号『Rice嵌入式开发技术分享』
全部0条评论
快来发表一下你的评论吧 !