本文摘录至《操作系统导论》第30章的相关内容
生产者消费者问题
所谓的生产者消费者(producer consumer
)问题,也被称为有界缓存区(bounded buffer
)问题,生产者将生成的数据放入缓冲区,消费者从缓冲区取走数据,以某种方式消费。
在很多实际的系统中存在这样的场景,比如多线程网络服务器中,一个生产者将HTTP请求放入工作队列,多个消费者线程从队列中取走请求进行处理。再比如在shell中使用管道连接不同程序的输入与输出,比如grep foo file.txt | wc -l
命令,grep
是生产者,而wc
是消费者。
因为有界缓冲区是共享资源,所以我们必须通过某种同步机制来访问有界缓冲区,以避免产生竞态条件。
有问题的方案
我们首先从一个简陋的方案开始,需要一个共享缓冲区,以便生产者存放数据,消费者取出数据,这里以一个整数作为缓冲区,两个函数分别实现将数值放入缓冲区,以及从缓冲区取走数值,相关代码为:
int buffer
int count = 0;
void put(int value)
{
assert(count == 0);
count = 1;
buffer = value;
}
int get()
{
assert(count == 1);
count = 0;
return buffer;
}
在以上的代码中,count
为1表示缓冲区已满,count
为0表示缓冲区为空。
现在我们需要一些操作,知道何时可以访问缓冲区,以便将数据放入缓冲区,或者从缓冲区读取数据。条件很明显,仅在count
为0时,即缓冲区为空时,才将数据放入缓冲区,仅在count
为1时,即缓冲区已满时,才从缓冲区取走数据,如果让生产者将数据放入已满的缓冲区,或者让消费者从已空的缓冲区获取数据,将发生错误。
生产者线程与消费者线程分别向缓冲区放入数据与取走数据,以下代码模拟了生产者线程与消费者线程:
void *producer(void *arg)
{
int i;
int loops = (int)arg;
for(i = 0; i < loops; i++)
{
put(i);
}
}
void *consumer(void *arg)
{
int i;
while(1)
{
int tmp = get();
printf("%d\n", tmp);
}
}
以上的生产者线程与消费者线程,显然没有考虑缓冲区作为共享资源会存在临界区的问题,给代码加锁是没有用的,需要条件变量,修改后的代码为:
cond_t cond;
mutex_t mutex;
void *producer(void *arg)
{
int i;
int loops = (int)arg;
for(i = 0; i < loops; i++)
{
pthread_mutex_lock(&mutex);
if(count == 1)
{
pthread_cond_wait(&cond, &mutex);
}
put(i);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
}
void *consumer(void *arg)
{
int i;
int loops = (int)arg;
for(i = 0; i < loops; i++)
{
pthread_mutex_lock(&mutex);
if(count == 0)
{
pthread_cond_wait(&cond, &mutex);
}
int tmp = get();
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
printf("%d\n", tmp);
}
}
以上的方案,存在两个严重的问题:
问题1
假设有两个消费者(消费者1与消费者2),一个生产者。
首先,消费者1开始执行,获取锁后,检查缓冲区为空,然后执行pthread_cond_wait
等待,进入睡眠,注意在此函数中会释放锁。
接着,生产者开始运行,其获取锁,检查缓冲区,发现缓冲区未满,则向缓冲区放入数值,然后生产者执行pthread_cond_signal
,发出信号,表示缓冲区已满,此时这个信号使得之前的消费者1不再睡眠,消费者1进入就绪队列,生产者发现缓冲区满后,进入睡眠。
此时,如果消费者2如果抢先得到运行,消费了缓冲区中的数据,然后消费者2进入睡眠,消费者1得到运行,消费者1则从pthread_cond_wait
中返回(注意,从此函数返回时,会获得锁),执行get
操作,但是此时缓冲区已经为空了,问题出现了!
显然,应该设法阻止消费1去读取缓冲区的数据,因为消费者2抢先将缓冲区的数据消费掉了。
产生问题的原因也很简单,在消费者1被生产者唤醒后,但是在消费者1运行之前,缓冲区的状态已经发生了改变(消费者2抢先消费了数据),因此生产者发出信号,只是唤醒了消费者1,暗示缓冲区的状态发生了变化,但是并不会保证在消费者1运行之前的状态一直都是期望的情况。
对此问题的解决方案很简单,将if
语句改为while
语句,表示当消费者1被唤醒后,应该立即再次检查共享变量,如果缓冲区此时为空,则消费者1应该回去继续睡眠,生产者需同样的修改,代码为:
cond_t cond;
mutex_t mutex;
void *producer(void *arg)
{
int i;
int loops = (int)arg;
for(i = 0; i < loops; i++)
{
pthread_mutex_lock(&mutex);
while(count == 1)
{
pthread_cond_wait(&cond, &mutex);
}
put(i);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
}
void *consumer(void *arg)
{
int i;
int loops = (int)arg;
for(i = 0; i < loops; i++)
{
pthread_mutex_lock(&mutex);
while(count == 0)
{
pthread_cond_wait(&cond, &mutex);
}
int tmp = get();
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
printf("%d\n", tmp);
}
}
问题2
假设两个消费者先运行,缓冲区为空,因此消费者1与消费者2都进入睡眠,生产者开始运行,在缓冲区放入数值后,发现信号,生产者进入睡眠。发出的信号唤醒了一个消费者,假设唤醒的是消费者1,此时消费者1处于就绪队列,而消费者2与生成者都处于睡眠中,并且都等待在同一个条件变量上。
消费者1得到运行,发现缓冲区为满,则消费者1消费了数据,然后消费者1发出了信号,在其睡眠之前,唤醒了一个正在等待的线程,即消费者2或者生产者。
问题的关键是,唤醒的是哪个线程?因为消费者1已经消费掉了缓冲区的数据,理论上应该唤醒生产者,但是如果唤醒的是消费者2呢?此时问题出现了,消费者2被唤醒后,发现缓冲区为空,又回去睡眠了,此时,生产者,消费者1,消费者2都处于睡眠状态了,大家都睡着了,显然这是个缺陷。
通过以上分析可知,消费者不应该唤醒消费者,而应该只唤醒生产者。对此问题的解决方案,就是使用两个条件变量,而不是一个。代码为:
cond_t empty;
cond_t fill;
mutex_t mutex;
void *producer(void *arg)
{
int i;
int loops = (int)arg;
for(i = 0; i < loops; i++)
{
pthread_mutex_lock(&mutex);
while(count == 1)
{
pthread_cond_wait(&empty, &mutex);
}
put(i);
pthread_cond_signal(&fill);
pthread_mutex_unlock(&mutex);
}
}
void *consumer(void *arg)
{
int i;
int loops = (int)arg;
for(i = 0; i < loops; i++)
{
pthread_mutex_lock(&mutex);
while(count == 0)
{
pthread_cond_wait(&fill, &mutex);
}
int tmp = get();
pthread_cond_signal(&empty);
pthread_mutex_unlock(&mutex);
printf("%d\n", tmp);
}
}
以上代码就可以解决了基于条件变量的生产者消费者问题。