多线程同步主要有信号量、互斥量、条件变量和读写锁四种方式。
0. 背景
竞争
#include <stdio.h>
#include <pthread.h>
void* func(void* arg){
printf("enter func\n");
sleep(1);
printf("do something\n");
sleep(1);
printf("level func\n");
}
int main(int argc,int argv[]){
pthread_t tids[3];
int i;
for(i=0;i<3;i++){
pthread_create(&tids[i],NULL,func,&mutex);
}
for(i=0;i<3;i++){
pthread_join(tids[i],NULL);
}
}
1. 信号量
1.1 操作
No. |
操作 |
函数 |
1 |
创建 |
int sem_init(sem_t *sem, int pshared, unsigned int value) |
2 |
销毁 |
int sem_destroy(sem_t *sem) |
3 |
阻塞等待 |
int sem_wait(sem_t *sem) |
4 |
非阻塞等待 |
int sem_trywait(sem_t * sem) |
5 |
触发 |
int sem_post(sem_t *sem) |
1.1.1 创建
int sem_init(sem_t *sem, int pshared, unsigned int value)
No. |
参数 |
含义 |
1 |
sem |
信号量对象 |
2 |
pshared |
信号量类型。0 :线程共享;<0 :进程共享 |
3 |
value |
初始值 |
No. |
返回值 |
含义 |
1 |
0 |
成功 |
1 |
-1 |
失败 |
1.1.2 销毁
int sem_destroy(sem_t *sem)
No. |
返回值 |
含义 |
1 |
0 |
成功 |
2 |
-1 |
失败 |
1.2 等待
1.2.1 阻塞等待
int sem_wait(sem_t *sem)
No. |
返回值 |
含义 |
1 |
0 |
成功 |
2 |
-1 |
失败 |
1.2.2 非阻塞等待
int sem_trywait(sem_t * sem)
No. |
返回值 |
含义 |
1 |
0 |
成功 |
2 |
-1 |
失败 |
1.3 触发
int sem_post(sem_t *sem)
No. |
返回值 |
含义 |
1 |
0 |
成功 |
2 |
-1 |
失败 |
#include <stdio.h>
#include <semaphore.h>
#include <pthread.h>
void* func(void* arg){
sem_wait(arg);
printf("enter func\n");
sleep(1);
printf("do something\n");
sleep(1);
printf("level func\n");
sem_post(arg);
}
int main(int argc,int argv[]){
sem_t sem;
sem_init(&sem,0,1);
pthread_t tids[3];
int i;
for(i=0;i<3;i++){
pthread_create(&tids[i],NULL,func,&sem);
}
for(i=0;i<3;i++){
pthread_join(tids[i],NULL);
}
sem_destroy(&sem);
}
2. 互斥量
2.1 分类
No. |
分类 |
实现 |
特点 |
1 |
静态分配互斥量 |
pthread_mutex_t mutex= PTHREAD_MUTEX_INITIALIZER; |
简单 |
2 |
动态分配互斥量 |
pthread_mutex_init(&mutex, NULL);pthread_mutex_destroy(&mutex); |
可以设置更多的选项 |
2.2 操作
No. |
操作 |
函数 |
1 |
加锁 |
int pthread_mutex_lock(pthread_t *mutex) |
2 |
尝试加锁 |
int pthread_mutex_trylock(pthread_t *mutex) |
3 |
解锁 |
int pthread_mutex_unlock(pthread_t *mutex) |
2.3 示例
#include <stdio.h>
#include <pthread.h>
pthread_mutex_t mutex= PTHREAD_MUTEX_INITIALIZER;
void* func(void* arg){
pthread_mutex_lock(&mutex);
printf("%ld enter func\n",pthread_self());
sleep(1);
printf("%ld do something\n",pthread_self());
sleep(1);
printf("%ld level func\n",pthread_self());
pthread_mutex_unlock(&mutex);
}
int main(int argc,int argv[]){
pthread_t tids[3];
int i;
for(i=0;i<3;i++){
pthread_create(&tids[i],NULL,func,NULL);
}
for(i=0;i<3;i++){
pthread_join(tids[i],NULL);
}
}
#include <stdio.h>
#include <pthread.h>
void* func(void* arg){
pthread_mutex_lock(arg);
printf("%ld enter func\n",pthread_self());
sleep(1);
printf("%ld do something\n",pthread_self());
sleep(1);
printf("%ld level func\n",pthread_self());
pthread_mutex_unlock(arg);
}
int main(int argc,int argv[]){
pthread_mutex_t mutex;
pthread_mutex_init(&mutex,NULL);
pthread_t tids[3];
int i;
for(i=0;i<3;i++){
pthread_create(&tids[i],NULL,func,&mutex);
}
for(i=0;i<3;i++){
pthread_join(tids[i],NULL);
}
pthread_mutex_destroy(&mutex);
}
2.4 可能出现的问题
- 线程在解锁之前退出。
#include <stdio.h>
#include <pthread.h>
pthread_mutex_t mutex= PTHREAD_MUTEX_INITIALIZER;
void* func(void* arg){
pthread_mutex_lock(&mutex);
printf("%ld enter func\n",pthread_self());
sleep(1);
printf("%ld do something\n",pthread_self());
pthread_exit(0);// 提前退出
sleep(1);
printf("%ld level func\n",pthread_self());
pthread_mutex_unlock(&mutex);
}
int main(int argc,int argv[]){
pthread_t tids[3];
int i;
for(i=0;i<3;i++){
pthread_create(&tids[i],NULL,func,NULL);
}
for(i=0;i<3;i++){
pthread_join(tids[i],NULL);
}
}
- 线程在解锁之前被其他线程杀掉。
#include <stdio.h>
#include <pthread.h>
pthread_mutex_t mutex= PTHREAD_MUTEX_INITIALIZER;
void* func(void* arg){
pthread_mutex_lock(&mutex);
printf("%ld enter func\n",pthread_self());
sleep(1);
printf("%ld do something\n",pthread_self());
sleep(1);
printf("%ld level func\n",pthread_self());
pthread_mutex_unlock(&mutex);
}
void* hacker(void* arg){
sleep(1);
pthread_t* tids = arg;
pthread_cancel(tids[0]);
printf("Kill TID:%ld\n",tids[0]);
}
int main(int argc,int argv[]){
void* (*funcs[])(void*) = {func,func,func,hacker};
pthread_t tids[4];
int i;
for(i=0;i<4;i++){
pthread_create(&tids[i],NULL,funcs[i],tids);
}
for(i=0;i<4;i++){
pthread_join(tids[i],NULL);
}
}
- 更加安全的做法
使用pthread_cleanup
,保证线程正常或者异常退出都能释放互斥锁。
线程在解锁之前退出解决方式
#include <stdio.h>
#include <pthread.h>
pthread_mutex_t mutex= PTHREAD_MUTEX_INITIALIZER;
void* func(void* arg){
pthread_cleanup_push(pthread_mutex_unlock,&mutex);
pthread_mutex_lock(&mutex);
printf("%ld enter func\n",pthread_self());
sleep(1);
printf("%ld do something\n",pthread_self());
pthread_exit(0);// 提前退出
sleep(1);
printf("%ld level func\n",pthread_self());
pthread_mutex_unlock(&mutex);
pthread_cleanup_pop(0);
}
int main(int argc,int argv[]){
pthread_t tids[3];
int i;
for(i=0;i<3;i++){
pthread_create(&tids[i],NULL,func,NULL);
}
for(i=0;i<3;i++){
pthread_join(tids[i],NULL);
}
}
线程在解锁之前被其他线程杀掉解决方式
#include <stdio.h>
#include <pthread.h>
pthread_mutex_t mutex= PTHREAD_MUTEX_INITIALIZER;
void* func(void* arg){
pthread_cleanup_push(pthread_mutex_unlock,&mutex);
pthread_mutex_lock(&mutex);
printf("%ld enter func\n",pthread_self());
sleep(1);
printf("%ld do something\n",pthread_self());
sleep(1);
printf("%ld level func\n",pthread_self());
pthread_exit(0);
pthread_cleanup_pop(0);
}
void* hacker(void* arg){
sleep(1);
pthread_t* tids = arg;
pthread_cancel(tids[0]);
printf("Kill TID:%ld\n",tids[0]);
}
int main(int argc,int argv[]){
void* (*funcs[])(void*) = {func,func,func,hacker};
pthread_t tids[4];
int i;
for(i=0;i<4;i++){
pthread_create(&tids[i],NULL,funcs[i],tids);
}
for(i=0;i<4;i++){
pthread_join(tids[i],NULL);
}
}
动态分配互斥量使用方式,也使用pthread_cleanup
#include <stdio.h>
#include <pthread.h>
void* func(void* arg){
pthread_cleanup_push(pthread_mutex_unlock,arg);
pthread_mutex_lock(arg);
printf("%ld enter func\n",pthread_self());
sleep(1);
printf("%ld do something\n",pthread_self());
sleep(1);
printf("%ld level func\n",pthread_self());
pthread_exit(0); // 退出才能出发clean_up
pthread_cleanup_pop(0);
}
int main(int argc,int argv[]){
pthread_mutex_t mutex;
pthread_mutex_init(&mutex,NULL);
pthread_t tids[3];
int i;
for(i=0;i<3;i++){
pthread_create(&tids[i],NULL,func,&mutex);
}
for(i=0;i<3;i++){
pthread_join(tids[i],NULL);
}
pthread_mutex_destroy(&mutex);
}
2.5 基本套路
在线程处理函数中使用互斥量的基本套路
pthread_cleanup_push(pthread_mutex_unlock,pmutex);
pthread_mutex_lock(pmutex);
// do something
pthread_exit(0); // 退出才能出发clean_up
pthread_cleanup_pop(0);
2.6 信号量与互斥量的区别
No. |
区别 |
信号量 |
互斥量 |
1 |
使用对象 |
线程和进程 |
线程 |
2 |
量值 |
非负整数 |
0或1 |
3 |
操作 |
PV操作可由不同线程完成 |
加锁和解锁必须由同一线程使用 |
4 |
应用 |
用于线程的同步 |
用于线程的互斥 |
- 互斥:主要关注于资源访问的唯一性和排他性。
- 同步:主要关注于操作的顺序,同步以互斥为前提。
3 条件变量
3.1 分类
No. |
分类 |
实现 |
特点 |
1 |
静态分配条件变量 |
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; |
简单 |
2 |
动态分配静态变量 |
pthread_cond_init(&cond, NULL);pthread_cond_destroy(&cond); |
可以设置更多的选项 |
3.2 操作
No. |
操作 |
函数 |
1 |
条件等待 |
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) |
2 |
计时等待 |
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime) |
3 |
单个激活 |
int pthread_cond_signal(pthread_cond_t *cond) |
4 |
全部激活 |
int pthread_cond_broadcast(pthread_cond_t *cond) |
3.2.1 等待
3.2.1.1 条件等待
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
No. |
参数 |
含义 |
1 |
cond |
条件变量 |
2 |
mutex |
互斥锁 |
3.2.1.2 计时等待
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
No. |
参数 |
含义 |
1 |
cond |
条件变量 |
2 |
mutex |
互斥锁 |
3 |
abstime |
等待时间 |
No. |
返回值 |
含义 |
1 |
ETIMEDOUT |
超时结束等待 |
3.2.2 激活
3.2.2.1 单个激活
int pthread_cond_signal(pthread_cond_t *cond)
No. |
返回值 |
含义 |
1 |
0 |
成功 |
2 |
正数 |
错误码 |
3.2.2.2 全部激活
int pthread_cond_broadcast(pthread_cond_t *cond)
No. |
返回值 |
含义 |
1 |
0 |
成功 |
2 |
正数 |
错误码 |
套路
条件变量一般与互斥锁一起使用。
pthread_mutex_lock(&mutex);
// do something
if(判断条件){
pthread_cond_signal(&cond);// 唤醒单个
// 或者
pthread_cond_broadcast(&cond);// 唤醒多个
}
pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
while(判断条件){
pthread_cond_wait(&cond,&mutex);
}
// do something
// 把判断条件改为false
pthread_mutex_unlock(&mutex);
流程分析
- 主线程:等待子线程发信号。
- 子线程:每隔3秒计数一次,当数字是3的倍数时通知主进程。
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdbool.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
bool condition = false;
#define LOG(msg) printf("%s:%s\n",__func__ ,msg);
void* ChildFunc(void* arg){
int i = 0;
while(true){
LOG("Enter");
pthread_mutex_lock(&mutex);
LOG("Get Lock");
printf("%d\n",++i);
if(0 == i%3){
condition = true;
LOG("Begin Send Single");
pthread_cond_signal(&cond);// 唤醒单个
LOG("End Send Single");
}
sleep(3);
pthread_mutex_unlock(&mutex);
LOG("Lose Lock");
LOG("Leave");
}
}
void MainFunc(){
while(true){
LOG("Enter");
pthread_mutex_lock(&mutex);
LOG("Get Lock");
while(!condition){
LOG("Begin Wait Single");
pthread_cond_wait(&cond,&mutex);
LOG("End Wait Single");
}
condition = false;
pthread_mutex_unlock(&mutex);
LOG("Lose Lock");
LOG("Leave");
}
}
int main(){
pthread_t tid;
pthread_create(&tid,NULL,ChildFunc,NULL);
MainFunc();
pthread_join(tid,NULL);
return 0;
}
问题:
- 主线程在
pthread_cond_wait()
时,是否释放互斥锁?
- 主线程在什么时候重新获得互斥锁?
案例
模拟放票和抢票:实现生产者消费者问题
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int num = 0;
void* GetTicket(void*){
while(true){
// 记录资源释放操作
pthread_cleanup_push((void (*)(void*))pthread_mutex_unlock,&mutex);
pthread_mutex_lock(&mutex);
if(0<num){ // 二次检查
--num;
cout << pthread_self() << "取走1张票,还剩" << num << "张票" << endl;
}else{
cout << pthread_self() << " exit" << endl;
pthread_exit(0);// 只能使用pthread_exit()
}
pthread_mutex_unlock(&mutex);
usleep(100000); // 模拟耗时操作
pthread_cleanup_pop(0);// 调用pthread_exit()后线程退出自动释放资源
}
}
int main(){
num = 100; // 必须提前存入车票
pthread_t tids[5];
for(int i=0;i<5;++i){
pthread_create(tids+i,NULL,GetTicket,NULL);
}
// 主线程放票
for(int i = 0;i<5;++i){
pthread_mutex_lock(&mutex);
num += 40;
cout << "主线程放票"<<num<<"张,现存" << num <<"票" << endl;
pthread_mutex_unlock(&mutex);
usleep(100000); // 模拟耗时操作
}
for(int i=0;i<5;++i){
pthread_join(tids[i],NULL);
}
}
- 初始票数不能为0,否则抢票线程刚开始回退出。
- 抢票过程票数不能为0,否则抢票线程刚开始回退出。
可以使用条件变量使抢票线程在票数为0时阻塞等待。
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
struct Data{
pthread_mutex_t* pmutex;
pthread_cond_t* pcond;
int* pnum;
int* pcount;
};
void* GetTicket(void* arg){
Data* pdata = (Data*)arg;
while(true){
// 记录资源释放操作
pthread_cleanup_push((void (*)(void*))pthread_mutex_unlock,pdata->pmutex);
pthread_mutex_lock(pdata->pmutex);
while(0>=*(pdata->pnum)){
if(0==*(pdata->pnum) && 0==(*pdata->pcount)){
cout << pthread_self() << " exit" << endl;
pthread_exit(0);// 只能使用pthread_exit()
}
pthread_cond_wait(pdata->pcond,pdata->pmutex);// 1.条件不满足阻塞,并且自动释放互斥锁
// 2.等待board_castt信号,重新抢夺互斥锁
}
--*(pdata->pnum);
cout << pthread_self() << "取走1张票,还剩" << *(pdata->pnum) << "张票" << endl;
pthread_mutex_unlock(pdata->pmutex);
usleep(100000); // 模拟耗时操作
pthread_cleanup_pop(0);// 调用pthread_exit()后线程退出自动释放资源
}
}
int main(){
pthread_mutex_t mutex;
pthread_mutex_init(&mutex,NULL);
pthread_cond_t cond;
pthread_cond_init(&cond,NULL);
int num = 0;
int count = 5;
pthread_t tids[5];
Data data = {&mutex,&cond,&num,&count};
for(int i=0;i<5;++i){
pthread_create(tids+i,NULL,GetTicket,&data);
}
// 主线程放票
while(count--){
pthread_mutex_lock(&mutex);
num += 20;
cout << "主线程放票" << num << "张,现存" << num <<"票" << endl;
pthread_mutex_unlock(&mutex);
pthread_cond_broadcast(&cond);// 唤醒所有在wait线程
sleep(1); // 模拟耗时操作
}
for(int i=0;i<5;++i){
pthread_join(tids[i],NULL);
}
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
}
老板与会计的约定:每笔超过1000元的支出必须老板批准同意,低于1000元的会计可以自行决定。
#include <stdio.h>
#include <pthread.h>
#include <signal.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int currency = 0 ;
int signal_cnt = 0;
void* checkout(void* arg){
sleep(1);
for(;;){
pthread_mutex_lock(&mutex);
printf("checkout enter\n");
currency = rand()%2000;
printf("spend %d\n",currency);
if(currency >= 1000){
printf("\033[42;31msignal boss:%d\033[0m\n");
pthread_cond_signal(&cond);
signal_cnt++;
}
printf("checkout leave\n");
pthread_mutex_unlock(&mutex);
//sched_yield();
sleep(1);
}
}
void* boss(void* arg){
for(;;){
pthread_mutex_lock(&mutex);
printf("boss enter\n");
while(currency < 1000){
printf("boss wait\n");
pthread_cond_wait(&cond,&mutex);
}
signal_cnt--;
printf("\033[46;31mboss agress:%d signal_cnt:%d\033[0m\n",currency,signal_cnt);
currency = 0;
printf("boss leave\n");
pthread_mutex_unlock(&mutex);
}
}
int main(){
typedef void*(*func_t)(void*);
func_t funcs[2] = {boss,checkout};
pthread_t tids[2];
int i;
pthread_setconcurrency(2);
for(i=0;i<2;i++){
pthread_create(&tids[i],NULL,funcs[i],tids);
}
for(i=0;i<2;i++){
pthread_join(tids[i],NULL);
}
}
问题
4. 读写锁
资源访问分为两种情况:读操作和写操作。
读写锁比mutex
有更高的适用性,可以多个线程同时占用读模式的读写锁,但是只能一个线程占用写模式的读写锁。
- 当读写锁是写加锁状态时,在这个锁被解锁之前,所有试图对这个锁加锁的线程都会被阻塞;
- 当读写锁在读加锁状态时,所有试图以读模式对它进行加锁的线程都可以得到访问权,但是以写模式对它进行枷锁的线程将阻塞;
- 当读写锁在读模式锁状态时,如果有另外线程试图以写模式加锁,读写锁通常会阻塞随后的读模式锁请求,这样可以避免读模式锁长期占用,而等待的写模式锁请求长期阻塞;
这种锁适用对数据结构进行读的次数比写的次数多的情况下,因为可以进行读锁共享。
新闻发布会
领导发言与聊天
4.1 分类
No. |
分类 |
实现 |
特点 |
1 |
静态分配读写锁 |
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER |
简单 |
2 |
动态分配读写锁 |
pthread_rwlock_init(&rwlock, NULL);pthread_rwlock_destroy(&rwlock); |
可以设置更多的选项 |
4.2 操作
4.2.1 加锁
4.2.1.1 读取锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
4.2.1.2 写入锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
4.2.2 解锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock)
火车票的查询与购买
#include <stdio.h>
#include <pthread.h>
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
int count = 10000;
int put_cur = 0;
void* search(void* arg){
for(;;){
pthread_rwlock_rdlock(&rwlock);
printf("leave %d\n",count);
usleep(500000);
pthread_rwlock_unlock(&rwlock);
}
}
void rollback(void* arg){
count -= put_cur;
printf("rollback %d to %d\n",put_cur,count);
pthread_rwlock_unlock(&rwlock);
}
void* put(void* arg){
pthread_cleanup_push(rollback,NULL);
for(;;){
pthread_rwlock_wrlock(&rwlock);
put_cur = rand()%1000;
count += put_cur;
printf("put %d ok,leave %d\n",put_cur,count);
usleep(500000);
pthread_rwlock_unlock(&rwlock);
}
pthread_cleanup_pop(0);
}
void* hacker(void* arg){
sleep(2);
pthread_t* ptids = arg;
pthread_cancel(ptids[0]);
printf("\033[41;34mcancel %lu\033[0m\n",ptids[0]);
}
void* get(void* arg){
for(;;){
pthread_rwlock_wrlock(&rwlock);
int cur = rand()%1000;
if(count>cur){
count -= cur;
printf("crash %d leave %d\n",cur,count);
}else{
printf("leave not enought %d\n",count);
}
usleep(500000);
pthread_rwlock_unlock(&rwlock);
}
}
int main(){
pthread_t tids[4];
typedef void*(*func_t)(void*);
func_t funcs[4] = {put,get,search,hacker};
int i=0;
pthread_setconcurrency(4);
for(i=0;i<4;i++){
pthread_create(&tids[i],NULL,funcs[i],tids);
}
for(i=0;i<4;i++){
pthread_join(tids[i],NULL);
}
}
四个线程:一个查看、一个放票、一个取票、一个随机破坏。