信号量
背景
信号量(semaphore)
-
抽象数据类型
一个整形(sem),两个原子操作
P():sem减1,如果sem<0,等待,否则继续
V():sem加1,如果sem<=0,唤醒一个等待的P
信号量使用
特性
-
信号量是被保护的整数变量
初始化完成后,只能通过P()和V()操作修改
由操作系统保证,PV操作是原子操作
P()可能阻塞,V()不会阻塞
-
通常假定信号量是”公平的“
线程不会被无限期阻塞在P()操作
假定信号量等待按先进先出排队
分类
二进制信号量:可以是0或1
一般/计数(资源)信号量:可取任何非负值
两者等价:基于一个可以实现另一个
使用
互斥访问:临界区的互斥访问控制
条件同步:线程间的事件等待
生产者-消费者问题
生成者->缓冲区->消费者
-
有界缓冲区的生产者-消费者问题描述
一个或多个生产者在生成数据后放在一个缓冲区里
单个消费者从缓冲区取出数据处理
任何时刻只能有一个生产者或消费者可访问缓冲区
-
问题分析
任何时刻只能有一个线程操作缓冲区(互斥访问)
缓冲区空时,消费者必须等待生产者(条件同步)
缓冲区满时,生产者必须等待消费者(条件同步)
-
用信号量描述每个约束
二进制信号量mutex
资源信号量fullBuffers
资源信号量emptyBuffers
class BoundedBuffer{
mutex =new Semaphore(1);
fullBuffers =new Semaphore(0);
emptyBuffers =new Semaphore(n);
}
BoundedBuffer::Producter(c){
emptyBuffers ->P();
mutex ->P();
ADD C TO THE BUFFER;
mutex ->V();
fullBuffers ->V();
}
BoundedBuffer::Consumer(c){
fullBuffers ->P();
mutex ->P();
REMOVE C FROM BUFFER;
mutex ->V();
emptyBuffers ->V();
}
信号量实现
- 实现
class Semaphore{
int sem;
WaitQueue q;
}
Semaphore: P(){
sem--;
if(sem<0){
ADD THIS THREAD t TO q;
block(q);
}
}
Semaphore: V(){
sem++;
if(sem<=0){
REMOVE a THREAD t FROM q;
wakeup(t);
}
}
-
困难
读/开发代码比较困难
容易出错。使用的信号量已经被另一个线程占用;忘记释放信号量
不能够处理死锁问题
管程(Moniter)
-
管程是一种用于多线程互斥访问共享资源的程序结构
采用面向对象方法,简化了线程间的同步控制
任一时刻最多只有一个线程执行管程代码
正在管程中的线程可临时放弃管程的互斥访问,等待事件出现时恢复
-
使用
在对象/模块中,收集相关共享数据
定义访问共享数据的方法
-
组成
一个锁:控制管程代码的互斥访问
0或者多个条件变量:管理共享数据的并发访问
-
条件变量
-
条件变量是管程内的等待机制
进入管程的线程因资源被抢占而进入等待状态
每个条件变量表示一种等待原因,对应一个等待队列
-
wait()操作
将自己阻塞在等待队列中
唤醒一个等待者或释放管程的互斥访问
-
signal()操作
将等待队列中的一个线程唤醒
如果等待队列为空,则等同空操作
-
条件变量实现
class Condition{
int numWaiting =0;
WaitQueue q;
}
Condition::Wait(lock){
numWaiting++;
ADD this thread t to q;
release(lock);
schedule();
require(lock);
}
Condition::Signal(){
if(numWaiting >0){
REMOVE a thread t from q;
wakeup(t);
numWaiting--;
}
}
- 管程解决生产者-消费者问题
class BoundedBuffer {
…
Lock lock;
int count = 0;
Condition notFull, notEmpty;
}
BoundedBuffer::Producter(c) {
lock->Acquire();
while (count == n)
notFull.Wait(&lock);
Add c to the buffer;
count++;
notEmpty.Signal();
lock->Release();
}
BoundedBuffer::Consumer(c) {
lock->Acquire();
while (count == 0)
notEmpty.Wait(&lock);
Remove c from buffer;
count--;
notFull.Signal();
lock->Release();
}
经典同步问题
读者-写者问题
-
共享数据的两类使用者
读者:只读取数据,不修改
写者:读取和修改数据
-
读者-写者问题描述:对共享数据的读写
读-读允许:同一时刻允许有多个读者同时读
读-写互斥:没有写者时读者才能读,没有读者时写者才能写
写-写互斥:没有其它写者时才能写
信号量方法
class Writer(){
P(WriteMutex);
write;
V(WriteMutex);
}
class Reader(){
P(CountMutex);
if(Rcount ==0)
P(WriteMutex);
++Rcount;
V(CountMutex);
read;
P(CountMutex);
--Rcount;
if(Rcount ==0)
V(WriteMutex);
V(CountMutex);
}
- 管程方法
AR = 0; // # of active readers
AW = 0; // # of active writers
WR = 0; // # of waiting readers
WW = 0; // # of waiting writers
Lock lock;
Condition okToRead;
Condition okToWrite;
//读者
Private Database::StartRead() {
lock.Acquire();
while ((AW+WW) > 0) {
WR++;
okToRead.wait(&lock);
WR--;
}
AR++;
lock.Release();
}
Private Database::DoneRead() {
lock.Acquire();
AR--;
if (AR ==0 && WW > 0) {
okToWrite.signal();
}
lock.Release();
}
Public Database::Read() {
//Wait until no writers;
StartRead();
read database;
//check out – wake up waiting writers;
DoneRead();
}
//写者
Private Database::StartRead() {
lock.Acquire();
while ((AW+WW) > 0) {
WR++;
okToRead.wait(&lock);
WR--;
}
AR++;
lock.Release();
}
Private Database::DoneRead() {
lock.Acquire();
AR--;
if (AR ==0 && WW > 0) {
okToWrite.signal();
}
lock.Release();
}
Public Database::Read() {
//Wait until no writers;
StartRead();
read database;
//check out – wake up waiting writers;
DoneRead();
}
哲学家就餐问题
-
问题描述
-
5个哲学家围绕一张圆桌而坐
桌子上放着5支叉子
每两个哲学家之间放一支
-
哲学家的动作包括思考和进餐
进餐时需同时拿到左右两边的叉子
思考时将两只叉子放回原处
-