(手写生产者消费者模型,写BlockingQueue较简便 )
1、背景
生产者生产数据到缓冲区中,消费者从缓冲区中取数据。
如果缓冲区已经满了,则生产者线程阻塞;
如果缓冲区为空,那么消费者线程阻塞。
2、方式一:synchronized、wait和notify
add()和remove()方法是synchronized 的
package producerConsumer; //wait 和 notify public class ProducerConsumerWithWaitNofity {public static void main(String[] args) {Resource resource = new Resource();//生产者线程ProducerThread p1 = new ProducerThread(resource);ProducerThread p2 = new ProducerThread(resource);ProducerThread p3 = new ProducerThread(resource);//消费者线程ConsumerThread c1 = new ConsumerThread(resource);//ConsumerThread c2 = new ConsumerThread(resource);//ConsumerThread c3 = new ConsumerThread(resource); p1.start();p2.start();p3.start();c1.start();//c2.start();//c3.start(); }} /*** 公共资源类* @author **/ class Resource{//重要//当前资源数量private int num = 0;//资源池中允许存放的资源数目private int size = 10;/*** 从资源池中取走资源*/public synchronized void remove(){if(num > 0){num--;System.out.println("消费者" + Thread.currentThread().getName() +"消耗一件资源," + "当前线程池有" + num + "个");notifyAll();//通知生产者生产资源}else{try {//如果没有资源,则消费者进入等待状态 wait();System.out.println("消费者" + Thread.currentThread().getName() + "线程进入等待状态");} catch (InterruptedException e) {e.printStackTrace();}}}/*** 向资源池中添加资源*/public synchronized void add(){if(num < size){num++;System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" + num + "个");//通知等待的消费者 notifyAll();}else{//如果当前资源池中有10件资源try{wait();//生产者进入等待状态,并释放锁System.out.println(Thread.currentThread().getName()+"线程进入等待");}catch(InterruptedException e){e.printStackTrace();}}} } /*** 消费者线程*/ class ConsumerThread extends Thread{private Resource resource;public ConsumerThread(Resource resource){this.resource = resource;}@Overridepublic void run() {while(true){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}resource.remove();}} } /*** 生产者线程*/ class ProducerThread extends Thread{private Resource resource;public ProducerThread(Resource resource){this.resource = resource;}@Overridepublic void run() {//不断地生产资源while(true){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}resource.add();}}}
3、方式二:lock和condition的await、signalAll
package producerConsumer;import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /*** 使用Lock 和 Condition解决生产者消费者问题* @author tangzhijing**/ public class LockCondition {public static void main(String[] args) {Lock lock = new ReentrantLock();Condition producerCondition = lock.newCondition();Condition consumerCondition = lock.newCondition();Resource2 resource = new Resource2(lock,producerCondition,consumerCondition);//生产者线程ProducerThread2 producer1 = new ProducerThread2(resource);//消费者线程ConsumerThread2 consumer1 = new ConsumerThread2(resource);ConsumerThread2 consumer2 = new ConsumerThread2(resource);ConsumerThread2 consumer3 = new ConsumerThread2(resource);producer1.start();consumer1.start();consumer2.start();consumer3.start();} } /*** 消费者线程*/ class ConsumerThread2 extends Thread{private Resource2 resource;public ConsumerThread2(Resource2 resource){this.resource = resource;//setName("消费者"); }public void run(){while(true){try {Thread.sleep((long) (1000 * Math.random()));} catch (InterruptedException e) {e.printStackTrace();}resource.remove();}} } /*** 生产者线程* @author tangzhijing**/ class ProducerThread2 extends Thread{private Resource2 resource;public ProducerThread2(Resource2 resource){this.resource = resource;setName("生产者");}public void run(){while(true){try {Thread.sleep((long) (1000 * Math.random()));} catch (InterruptedException e) {e.printStackTrace();}resource.add();}} } /*** 公共资源类* @author tangzhijing**/ class Resource2{private int num = 0;//当前资源数量private int size = 10;//资源池中允许存放的资源数目private Lock lock;private Condition producerCondition;private Condition consumerCondition;public Resource2(Lock lock, Condition producerCondition, Condition consumerCondition) {this.lock = lock;this.producerCondition = producerCondition;this.consumerCondition = consumerCondition;}/*** 向资源池中添加资源*/public void add(){lock.lock();try{if(num < size){num++;System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" + num + "个");//唤醒等待的消费者 consumerCondition.signalAll();}else{//让生产者线程等待try {producerCondition.await();System.out.println(Thread.currentThread().getName() + "线程进入等待");} catch (InterruptedException e) {e.printStackTrace();}}}finally{lock.unlock();}}/*** 从资源池中取走资源*/public void remove(){lock.lock();try{if(num > 0){num--;System.out.println("消费者" + Thread.currentThread().getName() + "消耗一件资源," + "当前资源池有" + num + "个");producerCondition.signalAll();//唤醒等待的生产者}else{try {consumerCondition.await();System.out.println(Thread.currentThread().getName() + "线程进入等待");} catch (InterruptedException e) {e.printStackTrace();}//让消费者等待 }}finally{lock.unlock();}}}
4、方式三:BlockingQueue
package producerConsumer;import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue;//使用阻塞队列BlockingQueue解决生产者消费者 public class BlockingQueueConsumerProducer {public static void main(String[] args) {Resource3 resource = new Resource3();//生产者线程ProducerThread3 p = new ProducerThread3(resource);//多个消费者ConsumerThread3 c1 = new ConsumerThread3(resource);ConsumerThread3 c2 = new ConsumerThread3(resource);ConsumerThread3 c3 = new ConsumerThread3(resource);p.start();c1.start();c2.start();c3.start();} } /*** 消费者线程* @author tangzhijing**/ class ConsumerThread3 extends Thread {private Resource3 resource3;public ConsumerThread3(Resource3 resource) {this.resource3 = resource;//setName("消费者"); }public void run() {while (true) {try {Thread.sleep((long) (1000 * Math.random()));} catch (InterruptedException e) {e.printStackTrace();}resource3.remove();}} } /*** 生产者线程* @author tangzhijing**/ class ProducerThread3 extends Thread{private Resource3 resource3;public ProducerThread3(Resource3 resource) {this.resource3 = resource;//setName("生产者"); }public void run() {while (true) {try {Thread.sleep((long) (1000 * Math.random()));} catch (InterruptedException e) {e.printStackTrace();}resource3.add();}} } class Resource3{private BlockingQueue<Integer> resourceQueue = new LinkedBlockingQueue<>(10);/*** 向资源池中添加资源*/public void add(){try {resourceQueue.put(1); //1当做生产和消费的Integer资源System.out.println("生产者" + Thread.currentThread().getName()+ "生产一件资源," + "当前资源池有" + resourceQueue.size() + "个资源");} catch (InterruptedException e) {e.printStackTrace();}}/*** 向资源池中移除资源*/public void remove(){try {resourceQueue.take();System.out.println("消费者" + Thread.currentThread().getName() + "消耗一件资源," + "当前资源池有" + resourceQueue.size() + "个资源");} catch (InterruptedException e) {e.printStackTrace();}} }