需求:
创建消息队列时需要指定队列的容量上限,队列中没有消息时,消费者从队列中take元素会阻塞;队列中的消息数量达到容量上限时,生产者往队列中put元素会阻塞。要保证线程安全。
组成:
(1)消息(Message):
class Message{//消息idprivate int id;//消息携带的真实数据private Object value;public Message(int id, Object value) {this.id = id;this.value = value;}public int getId() {return id;}public Object getValue() {return value;}@Overridepublic java.lang.String toString() {return "Message{" +"id=" + id +", value=" + value +'}';}
}
(2)消息队列(MessageQueue):
//消息队列
class MessageQueue{//存储消息的容器,双向队列LinkedList<Message> messages = new LinkedList<>();//存储消息容器的容量private int capacity;public MessageQueue(int capacity) {this.capacity = capacity;}//消费者线程获取消息的方法public Message take() {synchronized (messages) {//先判断容器中是否有消息while (messages.isEmpty()) {try {System.out.println("容器中没有消息,消费者线程陷入阻塞...");//容器中没有消息,线程进入Wait Set等待消息和被唤醒messages.wait();} catch (InterruptedException e) {e.printStackTrace();}}Message message = messages.removeFirst();System.out.println("消费者线程拿走了一条消息...");//拿走后唤醒因容器消息到达上线而阻塞的生产者线程messages.notifyAll();return message;}}//生产者线程产生消息的方法public void produce(Message message){synchronized (messages){//判断容器的容量是否达到上限while (messages.size() == capacity){try {System.out.println("容器中的消息达到容量上限,生产者线程陷入阻塞...");//容器中的消息达到容量上限,则生产者线程进入Wait Set,待消费者线程拿走结果后唤醒生产者线程messages.wait();} catch (InterruptedException e) {e.printStackTrace();}}//没有达到上限则往容器中添加结果messages.addLast(message);System.out.println("生产者线程往容器中添加了一条消息...");//唤醒因容器中没有结果而陷入阻塞的消费者线程messages.notifyAll();}}
}
测试:
(1)设置队列容量上限为3,4个生产者同时生产一条消息放入队列,2s后,1个消费者来消费消息,看生产者线程在消费者take消息前是否会阻塞。
public class ProducerConsumer {public static void main(String[] args) {MessageQueue messageQueue = new MessageQueue(3);AtomicInteger atomicInteger = new AtomicInteger(1);//四个生产者线程生产四条消息for (int i = 0; i <= 3; i++) {new Thread(()->{//调用消息队列中的produce方法生产消息messageQueue.produce(new Message(atomicInteger.getAndIncrement(),UUID.randomUUID().toString() ));},"生产者-"+i).start();}try {//模拟过2s消费者来获取消息Thread.sleep(2000);new Thread(() -> {Message msg = messageQueue.take();System.out.println("消费者拿到了消息id为:" + msg.getId() +"的消息");}).start();} catch (InterruptedException e) {e.printStackTrace();}}
}
测试结果:
(2)先启动消费者线程,过2s后再启动生产者线程,看消费者线程是否会在生产者put消息前一直阻塞。
public class ProducerConsumer {public static void main(String[] args) {MessageQueue messageQueue = new MessageQueue(3);AtomicInteger atomicInteger = new AtomicInteger(1);new Thread(() -> {System.out.println("消费者线程尝试从队列中获取消息");long start = System.currentTimeMillis();Message msg = messageQueue.take();long end = System.currentTimeMillis();System.out.println("经过:" + (end - start) +"ms后,消费者线程拿到了消息"+msg.getId());},"Consumer Thread").start();try {//2s后,生产者生产消息Thread.sleep(2000);new Thread(() -> {System.out.println("生产者线程将消息放入队列中");messageQueue.produce(new Message(1,"消息" + atomicInteger.getAndIncrement()));},"Producer Thread").start();} catch (InterruptedException e) {e.printStackTrace();}}
}
测试结果: