一、线程池功能组件
总共包含三个组件:线程池、线程执行任务,任务详情。
线程池包含条件等待,锁,链中线程任务某一个,链中job队列中某一个。
线程执行任务包含线程池,当前线程,当前线程的的prev和next。
任务详情包含回调函数,回调函数的参数,当前job的prev和next。
二、代码编写流程
- 初始化线程池:ThreadPool pool
- 为线程池添加线程:参数是线程池指针,线程数量
2.1 如果线程数量小于1,赋值1
2.2 为线程池pool分配内存空间
2.3 创建条件信号量,将条件信号量赋值给线程池的条件信号量
2.4 创建锁,将锁赋值给线程池的锁
2.5 遍历线程数量,生成执行任务的结构体,并分配内存空间,同时初始化执行任务结构体
2.6 创建线程pthread_create(&worker->thread,NULL,ntyWorkThread,(void*)worker),将其指针给执行任务的线程,并且执行的方法是线程执行任务的方法,worker是线程方法的参数。通过worker的线程池的等待任务队列中,获取对应的job回调函数和回调函数的参数,并执行任务的回调函数,释放worker,线程退出 - 为线程池添加任务: 参数是线程池指针,和job结构体
3.1 加锁
3.2 将线程池添加job - 关闭线程池
4.1 遍历线程池,将所有执行线程设置终止条件设置为1
4.2 加锁
4.3 将线程池的当前work设置为NULL
4.4 线程池的等待队列job设置为NULL
4.5 广播信号量
4.6 解锁
main的执行顺序
- 初始化线程池结构体
- 初始化线程池
- 添加job
- 关闭线程池
三、具体代码
//
// Created by rosy on 2022/7/24.
//
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#define MAX_NUM_THREADS 100
#define NUM_JOBS 1000
#define LL_ADD(item,list) do{ \
item->prev = NULL; \
item->next = list; \
list = item; \
} while(0)
#define LL_REMOVE(item,list) do { \
if(item->prev != NULL) item->prev->next = item->next; \
if(item->next != NULL) item->next->prev = item->prev; \
if(list == item) list = item->next; \
item->prev = item->next = NULL ;\
} while(0)
typedef struct NWORK{
pthread_t thread;
struct THREAD_POOL *thread_pool;
struct NWORK *prev;
struct NWORK *next;
int terminate;
} nWork;
typedef struct THREAD_POOL{
pthread_cond_t job_cond;
pthread_mutex_t job_mtx;
struct NWORK *works;
struct NJOB *jobs;
} thread_pool;
typedef struct NJOB{
void (*job_function)(struct NJOB *job);
void *user_data;
struct NJOB *prev;
struct NJOB *next;
} nJob;
static void *work_do_job(void *ptr){
nWork *work = (nWork*)ptr;
while(1) {
pthread_mutex_lock(&work->thread_pool->job_mtx);
while(work->thread_pool->jobs == NULL){
if(work->terminate){
break;
}
pthread_cond_wait(&work->thread_pool->job_cond,&work->thread_pool->job_mtx);
}
if(work->terminate){
pthread_mutex_unlock(&work->thread_pool->job_mtx);
break;
}
nJob *job = work->thread_pool->jobs;
if(job != NULL){ //移除当前job
LL_REMOVE(job,work->thread_pool->jobs);
}
pthread_mutex_unlock(&work->thread_pool->job_mtx);
if(job == NULL) continue;
job->job_function(job);
}
free(work);
pthread_exit(NULL);
}
int createThreadPool(thread_pool* pool,int num_works){
if(num_works < 1) {
num_works = 1;
}
memset(pool,0,sizeof(thread_pool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&pool->job_cond,&blank_cond,sizeof(pool->job_cond));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&pool->job_mtx,&blank_mutex,sizeof(pool->job_mtx));
for(int i = 0; i < num_works;i++){
nWork *work = (nWork*)malloc(sizeof(nWork));
if(work == NULL){
perror("malloc work");
return 1;
}
memset(work,0,sizeof(nWork));
work->thread_pool = pool;
int ret = pthread_create(&work->thread,NULL,work_do_job,(void *)work);
if(ret){
perror("pthread_create");
free(work);
return 1;
}
LL_ADD(work,work->thread_pool->works);
}
return 0;
}
void add_job(thread_pool *pool,nJob* job){
pthread_mutex_lock(&pool->job_mtx);
LL_ADD(job,pool->jobs);
pthread_cond_signal(&pool->job_cond);
pthread_mutex_unlock(&pool->job_mtx);
}
void shut_down(thread_pool *pool){
nWork *work = NULL;
for(work = pool->works; work != NULL; work = work->next){
work->terminate = 1;
}
pthread_mutex_lock(&pool->job_mtx);
pool->works = NULL;
pool->jobs = NULL;
pthread_cond_broadcast(&pool->job_cond);
pthread_mutex_unlock(&pool->job_mtx);
}
void counter(nJob *job){
int index = *(int*)job->user_data;
printf("index:%d,selfid:%lu\n",index,pthread_self());
free(job->user_data);
free(job);
}
int main(int argc,char *argv[]) {
printf("hello yes\n");
thread_pool pool;
createThreadPool(&pool,MAX_NUM_THREADS);
printf("create thread pool\n");
for(int i = 0; i < NUM_JOBS; i++){
nJob *job = (nJob*)malloc(sizeof(nJob));
if(job == NULL){
perror("malloc");
exit(1);
}
job->job_function = counter;
job->user_data = malloc(sizeof(int));
*(int*)job->user_data = i;
add_job(&pool,job);
}
printf("关闭...\n");
shut_down(&pool);
getchar();
printf("finish\n");
}