Java基础教程:多线程基础(3)——阻塞队列
快速开始
引入问题
生产者消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据。
模拟情景
这里我们实现如下的情况的生产-消费模型:
生产者不断交替地生产两组数据“姓名--1-->内容--1”,“姓名--2-->内容--2”,这里的“姓名--1”和“姓名--2”模拟为数据的名称,“内容--1 ”和“内容--2 ”模拟为数据的内容。
由于本程序中牵扯到线程运行的不确定性,因此可能会出现以下问题:
1.假设生产者线程刚向数据存储空间添加了数据的名称,还没有加入该信息的内容,程序就切换到了消费者线程,消费者线程把信息的名称和上一个信息的内容联系到了一起;
2.生产者生产了若干条数据,消费者才可以取数据,或者是,消费者取完一次数据后,还没等生产者放入新的数据,又重复取出了已取过的数据。
通过分析我们可知:
第一个问题可以通过同步来解决,第二个问题就需要用到线程通信。生产者线程放入数据后,通知消费者线程取出数据,消费者线程取出数据后,通知生产者线程生产数据,这里用wait\notigy机制来实现。
Java代码
定义信息类
package thread;public class Info {private String name = "name";private String content = "content";//设置标志位,用来进行线程通信private boolean flag =true;/*** 设置消息,此处用到线程同步* @param name* @param content*/public synchronized void set(String name,String content){while (!flag){try {super.wait();} catch (InterruptedException e) {e.printStackTrace();}}this.name=name; //设置名称try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}this.content=content; //设置内容flag =false; //设置标志位,表示现在生产停止,可以取走!}public synchronized void get(){while (flag){try {super.wait();} catch (InterruptedException e) {e.printStackTrace();}}try {Thread.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(name +" --> " + content) ;flag = true ; // 改变标志位,表示可以生产super.notify();}}
定义生产者
public class Producer implements Runnable {private Info info=null;public Producer(Info info){this.info=info;}@Overridepublic void run() {boolean flag = true ; // 定义标记位for(int i=0;i<10;i++){if(flag){this.info.set("姓名--1","内容--1") ; // 设置名称flag = false ;}else{this.info.set("姓名--2","内容--2") ; // 设置名称flag = true ;}}}
}
定义消费者
public class Consumer implements Runnable {private Info info = null ;public Consumer(Info info){this.info = info ;}public void run(){for(int i=0;i<10;i++){this.info.get() ;}}public static void main(String[] args) {Info info = new Info(); // 实例化Info对象Producer pro = new Producer(info) ; // 生产者Consumer con = new Consumer(info) ; // 消费者new Thread(pro).start() ;//启动了生产者线程后,再启动消费者线程try{Thread.sleep(500) ;}catch(InterruptedException e){e.printStackTrace() ;}new Thread(con).start() ;}
}
使用阻塞队列来实现相同功能
引入BlockingQueue
任何有效的生产者-消费者问题解决方案都是通过控制生产者put()方法(生产资源)和消费者take()方法(消费资源)的调用来实现的,一旦你实现了对方法的阻塞控制,那么你将解决该问题。Java通过BlockingQueue
提供了开箱即用的支持来控制这些方法的调用(一个线程创建资源,另一个消费资源)。java.util.concurrent
包下的BlockingQueue
接口是一个线程安全的可用于存取对象的队列。
BlockingQueue是一种数据结构,支持一个线程往里存资源,另一个线程从里取资源。这正是解决生产者消费者问题所需要的,那么让我们开始解决该问题吧。
Java代码
消息类
public class InfoPlus {private String name = "name";private String content = "content";public InfoPlus(String name, String content) {this.name = name;this.content = content;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}@Overridepublic String toString() {return "InfoPlus{" +"name='" + name + '\'' +", content='" + content + '\'' +'}';}
}
生产者
import java.util.concurrent.BlockingQueue;public class ProducerPlus implements Runnable {private BlockingQueue<InfoPlus> queue;public ProducerPlus(BlockingQueue<InfoPlus> queue) {this.queue = queue;}@Overridepublic void run() {for (int i=0;i<10;i++){try {Thread.sleep(1000);queue.put(new InfoPlus("name"+i,"content"+i));} catch (InterruptedException e) {e.printStackTrace();}}}
}
消费者
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;public class ConsumerPlus implements Runnable{private BlockingQueue<InfoPlus> queue;public ConsumerPlus(BlockingQueue<InfoPlus> queue) {this.queue = queue;}public void run() {while (true) {try {System.out.println(this.queue.take());} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {BlockingQueue<InfoPlus> blockingQueue = new LinkedBlockingDeque<>();ProducerPlus producerPlus = new ProducerPlus(blockingQueue);ConsumerPlus consumerPlus = new ConsumerPlus(blockingQueue);ConsumerPlus consumerPlus1 = new ConsumerPlus(blockingQueue);new Thread(producerPlus).start();new Thread(consumerPlus).start();new Thread(consumerPlus1).start();}
}