http://blog.csdn.net/gebushuaidanhenhuai/article/details/74011636
基本概念
提到生产者和消费者,我们最有可能想到的是商店卖东西,顾客在货架上(缓冲区)买东西。
生产者消费者问题,其实是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程—即所谓的“生产者”和“消费者”–在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放在缓冲区中,消费者在缓冲区消耗这些数据。但是,要保证生产者不会在缓冲区满时还往缓冲区写数据,消费者也不会在缓冲区为空时读数据。
三种关系
- 生产者与消费者之间是供求关系(互斥和同步)
- 生产者与生产者之间是竞争关系(互斥)
- 消费者与消费者之间是竞争关系(互斥)
我们简单解释一下三种关系。假如我们现在在一家超市,我们们想要买一箱牛奶。牛奶生产商(生产者)生产了牛奶,经超市工作人员把牛奶摆放在了货架上,在这个过程过我们(消费者)不能买牛奶,要等待工作人员摆好货物,所以此时生产者与消费者是互斥关系。工作人员摆好货物后,我们(消费者)去购买,此时生产者与消费者是同步关系。
一个货架上只能摆一个品牌的货物,怒能摆其他的,此时生产者与生产者之间是互斥关系。
两个或多个顾客不能同时买一个货物,此时消费者与消费者之间是互斥关系。
我们可以用两种方法实现生产者与消费者模型。
基于单链表的生产者消费者模型
我们用两个线程分别表示生产者与消费者,用单链表表示缓冲区。
生产者生产数据,插入到单链表的头部。
消费者消费数据,从单链表的头部读数据。
条件变量
条件变量是利用线程间共享的全局变量进行同步的一种机制,只要包括两个动作:一个线程等待”条件变量的条件成立”而挂起;另一个线程使”条件成立(给出条件成立信号)。
为了放置竞争,条件变量的使用总和一个互斥锁结合在一起。
条件变量的类型为 pthread_cond_t.
条件变量的初始化:
1.直接定义一个全局的条件变量,并利用宏PTHREAD_COND_INITIALIZER进行值得初始化。
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
2.调用函数pthread_cond_init
#include<pthread.h>
pthread_cond_init (pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
第一个参数即为我们动态分配的条件变量cond,除非创建一个非默认属性的条件变量,否则第二个参数attr始终为NULL;
注意:若不想讲条件变量定义成全局的,必须以动态分配的方式创建。
pthread_cond_t *cond = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
注意:使用此种方式,先destroy条件变量,再free这块空间。
3.销毁条件变量
int pthread_cond_destroy(pthread_cond_t* cond);
参数cond指针即指向我们创建的条件变量。
4.等待
我们使用pthread_cond_wait或pthread_cond_timewait函数等待条件变量变为真。
int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
pthread _cond_wait,其第一个参数是指向条件变量的指针,第二个参数是一个指向互斥锁的指针。在上面提到过,条件变量总是和一把互斥锁协同使用的。目的是为了防止资源的竞争。
生产者与消费者之间是同步互斥关系的,他们不能同时访问缓冲区,所以我们需要一把锁来约束他们。
假如我们此时有两个消费者A,B在等待资源,生产者申请到了”锁“,并且生产了一个产品,释放锁。并发送信号告诉消费者你们可以来消费了。
假如消费者A 率先抢到锁,买走了产品。B再申请到锁时,发现已经没有产品了,只能等待条件变量为真时,买产品。此时锁在B身上,如果B一直在等待,一直不释放锁时,会造成生产者申请不到锁而造成“死锁”。所以wait的第二个参数就是当消费者在申请到锁时,条件变量为假时,及时的释放锁资源。
wait函数是无条件等待。在条件变量为假时,会一直等下去。timedwait是有条件等待,它多定义了一个超时,超时值定义了我们愿意等待多长时间。它通过timespec决定。
5.发送信号
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
当生产者生产完毕后需要通知消费者。发送信号有两种方式。signal是根据某种优先级唤醒一个等待者。broadcast是在资源充足的情况下进行广播,唤醒所有等待者。
代码实现:
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>typedef struct _list
{struct _list *next;int _val;
}product_list;product_list *head = NULL;
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t need_product = PTHREAD_COND_INITIALIZER;void Init_list(product_list* list)
{if(list != NULL){list -> next = NULL;list -> _val = 0;}
}void* Consumer(void* _val)
{product_list *p = NULL;for(;;){pthread_mutex_lock(&lock);while(head == NULL){pthread_cond_wait(&need_product,&lock);}p = head;head = head -> next;p -> next = NULL;pthread_mutex_unlock(&lock);printf("Consum success,val is:%d\n",p -> _val);free(p);}return NULL;
}void* Product(void* _val)
{for(;;){sleep(rand() % 2);product_list* p =malloc(sizeof(product_list));pthread_mutex_lock(&lock);Init_list(p);p -> _val = rand() % 1000;p -> next = head;head = p;pthread_mutex_unlock(&lock);printf("Call consumer! Product has producted,val is:%d\n",p->_val);pthread_cond_signal(&need_product);}
}int main()
{pthread_t t_product;pthread_t t_consumer;pthread_create(&t_product,NULL,Product,NULL);pthread_create(&t_consumer,NULL,Consumer,NULL);pthread_join(t_product,NULL);pthread_join(t_consumer,NULL);return 0;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
基于环形队列的生产者消费者模型
除了基于单链表的生产者与消费者模型,我们还可以利用信号量实现生产者消费者模型。
原理
生产者在空格子上生产数据。
消费者在有商品的格子上消费数据。
注意:
- 生产者先进行生产。
- 当消费者没有数据要消费时,需等待生产者生产。
- 当生产者把缓冲区充满时,需等待消费者消费,出现空格子时在生产。
操作函数
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t *sem);
初始化信号量sem_init,参数value为信号量的值,参数pshared一般设为0,表示信号量用于同一进程内线程间同步。摧毁信号量sem_destroy。P操作(申请资源)sem_wait,使信号量的值-1。V操作(释放资源)sem_post,使信号量的值+1。sem_trywait是尝试申请资源。
代码实现
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<semaphore.h>#define _SIZE_ 5
sem_t blanks;
sem_t datas;
int buf[_SIZE_] ={ 0 };
void* product(void* arg)
{int i = 0;while(1){usleep(500000);sem_wait(&blanks); int data = rand()%1000;buf[i] = data;printf("Product is:%d\n",data);sem_post(&datas); ++i;i %= _SIZE_;}
}
void* consumer(void* arg)
{int i = 0;while(1){usleep(500000); sem_wait(&datas); printf("Consumer is%d\n",buf[i]);sem_post(&blanks); ++i;i %= _SIZE_;}
}int main()
{sem_init(&blanks,0,_SIZE_);sem_init(&datas,0,0);pthread_t _consumer;pthread_t _product;pthread_create(&_consumer,NULL,consumer,NULL);pthread_create(&_product,NULL,product,NULL);pthread_join(_consumer,NULL);pthread_join(_product,NULL);sem_destroy(&blanks);sem_destroy(&datas);return 0;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61