生产者消费者
与保护性暂停中的不同,不需要产生结果和消费结果的线程一一对应。
生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
JDK 中各种阻塞队列,采用的就是这种模式
代码实现:
首先,设计消息队列类MessageQueue,需要指定容量capacity,用双向链表list作为容器。
提供take方法:检查list是否是空,空的话就wait,如果不空就打印Message,并唤醒所有线程。
提供put方法:检查list是否满了,满了的话就wait,如果不满就添加Message,并唤醒所有线程。
可以看到,以上的写法都是使用wait和notify的模板写法。
class MessageQueue{private LinkedList<Message> list = new LinkedList<>();private int capacity;public MessageQueue(int capacity) {this.capacity = capacity;}//取public Message take(){synchronized (list){while (list.isEmpty()){try {log.debug("队列空了");list.wait();} catch (InterruptedException e) {e.printStackTrace();}}Message message = list.removeFirst();log.debug("取出来了{}", message);list.notifyAll();return message;}}//存public void put(Message message){synchronized (list){while(list.size() == capacity){try {log.debug("队列满了");list.wait();} catch (InterruptedException e) {e.printStackTrace();}}list.addLast(message);log.debug("添加了{}", message);list.notifyAll();}}
}
设计Message类,需要一个唯一标识id,还需要一个Object类型的value值。
final class Message{private int id;private Object value;public Message(int id, Object value) {this.id = id;this.value = value;}public int getId() {return id;}public Object getValue() {return value;}@Overridepublic String toString() {return "Message{" +"id=" + id +", value=" + value +'}';}
}
完整代码:
public class ProductAndConsumer {public static void main(String[] args) {MessageQueue q = new MessageQueue(3);for (int i = 1; i <= 5; i++){//lambda表达式必须传final的值,不能变。int id = i;new Thread(()->{q.put(new Message(id, "v"+id));}, "生产者").start();}new Thread(()->{while (true){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}Message t = q.take();}}, "消费者").start();}
}/*** 消息队列类*/
@Slf4j(topic = "c.test")
class MessageQueue{private LinkedList<Message> list = new LinkedList<>();private int capacity;public MessageQueue(int capacity) {this.capacity = capacity;}//取public Message take(){synchronized (list){while (list.isEmpty()){try {log.debug("队列空了");list.wait();} catch (InterruptedException e) {e.printStackTrace();}}Message message = list.removeFirst();log.debug("取出来了{}", message);list.notifyAll();return message;}}//存public void put(Message message){synchronized (list){while(list.size() == capacity){try {log.debug("队列满了");list.wait();} catch (InterruptedException e) {e.printStackTrace();}}list.addLast(message);log.debug("添加了{}", message);list.notifyAll();}}
}/*** Message对象,只有get方法,并且final不可继承,也不会被子类重写方法,很安全。*/
final class Message{private int id;private Object value;public Message(int id, Object value) {this.id = id;this.value = value;}public int getId() {return id;}public Object getValue() {return value;}@Overridepublic String toString() {return "Message{" +"id=" + id +", value=" + value +'}';}
}
某次的运行结果
21:17:06 [生产者] c.test - 添加了Message{id=5, value=v5}
21:17:06 [生产者] c.test - 添加了Message{id=2, value=v2}
21:17:06 [生产者] c.test - 添加了Message{id=3, value=v3}
21:17:06 [生产者] c.test - 队列满了
21:17:06 [生产者] c.test - 队列满了
21:17:07 [消费者] c.test - 取出来了Message{id=5, value=v5}
21:17:07 [生产者] c.test - 添加了Message{id=1, value=v1}
21:17:07 [生产者] c.test - 队列满了
21:17:08 [消费者] c.test - 取出来了Message{id=2, value=v2}
21:17:08 [生产者] c.test - 添加了Message{id=4, value=v4}
21:17:09 [消费者] c.test - 取出来了Message{id=3, value=v3}
21:17:10 [消费者] c.test - 取出来了Message{id=1, value=v1}
21:17:11 [消费者] c.test - 取出来了Message{id=4, value=v4}
21:17:12 [消费者] c.test - 队列空了