生产者消费者模型
一、 生产者消费者问题
生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
.
要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。
二、 问题分析
该问题需要注意的几点:
- 在缓冲区为空时,消费者不能再进行消费
- 在缓冲区为满时,生产者不能再进行生产
- 在一个线程进行生产或消费时,其余线程不能再进行生产或消费等操作,即保持线程间的同步
- 注意条件变量与互斥锁的顺序
由于前两点原因,因此需要保持线程间的同步,即一个线程消费(或生产)完,其他线程才能进行竞争CPU,获得消费(或生产)的机会。对于这一点,可以使用条件变量进行线程间的同步:生产者线程在product之前,需要wait直至获取自己所需的信号量之后,才会进行product的操作;同样,对于消费者线程,在consume之前需要wait直到没有线程在访问共享区(缓冲区),再进行consume的操作,之后再解锁并唤醒其他可用阻塞线程。
在访问共享区资源时,为避免多个线程同时访问资源造成混乱,需要对共享资源加锁,从而保证某一时刻只有一个线程在访问共享资源。
三、 伪代码实现
假设缓冲区大小为10,生产者、消费者线程若干。生产者和消费者相互等效,只要缓冲池未满,生产者便可将消息送入缓冲池;只要缓冲池未空,消费者便可从缓冲池中取走一个消息。
- items代表缓冲区已经使用的资源数,spaces代表缓冲区可用资源数
- mutex代表互斥锁
- buf[10] 代表缓冲区,其内容类型为item
- in、out代表第一个资源和最后一个资源
var items = 0, space = 10, mutex = 1;
var in = 0, out = 0;
item buf[10] = { NULL };producer {while( true ) {wait( space ); // 等待缓冲区有空闲位置, 在使用PV操作时,条件变量需要在互斥锁之前wait( mutex ); // 保证在product时不会有其他线程访问缓冲区// productbuf.push( item, in ); // 将新资源放到buf[in]位置 in = ( in + 1 ) % 10;signal( mutex ); // 唤醒的顺序可以不同signal( items ); // 通知consumer缓冲区有资源可以取走}
}consumer {while( true ) {wait( items ); // 等待缓冲区有资源可以使用wait( mutex ); // 保证在consume时不会有其他线程访问缓冲区// consumebuf.pop( out ); // 将buf[out]位置的的资源取走out = ( out + 1 ) % 10;signal( mutex ); // 唤醒的顺序可以不同signal( space ); // 通知缓冲区有空闲位置}
}
不能将线程里两个wait的顺序调换否则会出现死锁。例如(调换后),将consumer的两个wait调换,在producer发出signal信号后,如果producer线程此时再次获得运行机会,执行完了wait(space)
,此时,另一个consumer线程获得运行机会,执行了 wait(mutex) ,如果此时缓冲区为空,那么consumer将会阻塞在wait(items)
,而producer也会因为无法获得锁的所有权所以阻塞在wait(mutex)
,这样两个线程都在阻塞,也就造成了死锁。
四、代码实现(C++)
#include <iostream>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
using namespace std;int current = 0; // producer运行加1,consumer运行减1
int buf[10];
int in = 0, out = 0;
int items = 0, spaces = 10;
bool flag; // 标记线程结束运行
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t notfull = PTHREAD_COND_INITIALIZER; // 缓冲区不满
pthread_cond_t notempty = PTHREAD_COND_INITIALIZER; // 缓冲区不空void *producer( void *arg ) {while( flag ) {pthread_mutex_lock( &mutex ); // 为保证条件变量不会因为多线程混乱,所以先加锁while( !spaces ) { // 避免“惊群”效应,避免因其他线程实现得到事件而导致该线程“假醒”pthread_cond_wait( ¬full, &mutex );}buf[in] = current++;in = ( in + 1 ) % 10;items++;spaces--;printf( "producer %zu , current = %d\n", pthread_self(), current );for( int i = 0; i < 10; i++ ) {printf( "%-4d", buf[i] );}printf( "\n\n" );pthread_cond_signal( ¬empty );pthread_mutex_unlock( &mutex );}pthread_exit( NULL );
}void *consumer( void *arg ) {while( flag ) {pthread_mutex_lock( &mutex );while( !items ) {pthread_cond_wait( ¬empty, &mutex );}buf[out] = -1;out = ( out + 1 ) % 10;current--;items--;spaces++;printf( "consumer %zu , current = %d\n", pthread_self(), current );for( int i = 0; i < 10; i++ ) {printf( "%-4d", buf[i] );}printf( "\n\n" );pthread_cond_signal( ¬full );pthread_mutex_unlock( &mutex );}pthread_exit( NULL );
}int main() {memset( buf, -1, sizeof(buf) );flag = true;pthread_t pro[10], con[10];int i = 0;for( int i = 0; i < 10; i++ ) {pthread_create( &pro[i], NULL, producer, NULL );pthread_create( &con[i], NULL, consumer, NULL );}sleep(1); // 让线程运行一秒flag = false;for( int i = 0; i < 10; i++ ) {pthread_join( pro[i], NULL );pthread_join( con[i], NULL );}return 0;
}
五、 互斥锁与条件变量的使用比较
我们会发现,在伪代码中强调了条件变量在前,互斥锁在后,而到了代码实现时又变成了先加互斥锁,再进行循环pthread_cond_wait()
。这不是自相矛盾吗?
其实,在伪代码中的wait()
、signal()
就是操作系统中的PV操作,而PV操作定义就保证了该语句是原子操作,因此在wait条件变量改变的时候不会因为多进程同时访问共享资源造成混乱,所以为了保证线程间的同步,需要先加条件变量,等事件可使用后才进行线程相应的操作,此时互斥锁的作用是保证共享资源不会被其他线程访问。
而在代码实现中,signal()
对应的时pthread_cond_wait()
函数,该函数在执行时会有三步:
- 解开当前的锁
- 等待条件变量达到所需要的状态
- 再把之前解开的锁加锁
为了实现将pthread_cond_wait()
变成原子操作,就需要在该函数之前添加互斥锁。因为pthread_cond_wait()
可以解锁,也就不会发生像伪代码所说的死锁问题。相反,如果像伪代码那样先使用条件变量,后加锁,则会造成多个线程同时访问共享资源的问题,造成数据的混乱。
欢迎关注微信公众号,不定时分享学习资料与学习笔记,感谢!