目录
- 前言
- 消费者类设计思路
- 核心API
- 总体代码
前言
我们上一篇博客,写了虚拟主机的实现, 在虚拟主机中需要用到俩个未实现的类,分别是验证绑定关键字和消费者类,接下来我们实现消费者类的核心代码
消费者类设计思路
在这个类中,首先我们要持有virtualHost对象来操作数据, 然后我们指定一个线程池负责具体的回调函数,通过一个扫描队列来不停的扫描所有的队列,看那个队列有新的消息,如果有就放到阻塞队列中去,消费者每次从阻塞队列中取出一个消息来响应。如果是多个消费者都订阅了一个消息,那么就使用轮询的方式来获取消息
核心API
属性
虚拟主机
线程池
阻塞 队列
扫描线程
方法
①往阻塞队列中添加消息
// 往阻塞队列中添加消息public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}
②订阅消息
我们的思路是,先找到对应的队列,然后去查看队列中是否有消息,如果有就要消费掉这些消息
//添加订阅者public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 1 找到对应的队列MSGQueue queue = virtuaHost.getMemoryDataCenter().getQueue(queueName);if (queue == null){throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumEnv consumEnv = new ConsumEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue){queue.addConsumEnv(consumEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = virtuaHost.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}}
③消费消息
关于消费消息,我们按照轮询的方式来依次消费
// 消费消息private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumEnv luckyDog = queue.chooseConsumEnv();if (luckyDog == null){// 说明没有消费者return;}// 2. 从队列中取出一个消息Message message = virtuaHost.getMemoryDataCenter().pollMessage(queue.getName());if (message == null){// 说明没有消息,不能消费return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workPool.submit(()->{try {//1,将消息放到待确认的集合中, 这个操作在回调函数之前virtuaHost.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);//2. 执行回调函数luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());System.out.println("[ConsumerManager] 消息被成功消费, queueName = "+queue.getName());//3. 如果是自动应答, 就可以之间删除消息// 如果是手动应答, 就先什么也不做if (luckyDog.isAutoAck()){// 1删除硬盘上的消息if (message.getDeliverMode() == 2){virtuaHost.getDiskDataCenter().deleteMessage(queue,message);}//2 删除待确认的消息virtuaHost.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3 删除内存中的消息virtuaHost.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (IOException | ClassNotFoundException | MqException e) {e.printStackTrace();}});}
总体代码
package com.example.demo.mqServer.core;import com.example.demo.Common.ConsumEnv;
import com.example.demo.Common.Consumer;
import com.example.demo.Common.MqException;
import com.example.demo.mqServer.VirtuaHost;import java.io.IOException;
import java.util.concurrent.*;/*
* 通过这个类, 来实现来实现消费者消费消息的核心功能
* */
public class ConsumerManager {// 持有上层对象 VirtualHost 调用 ,来操作数据private VirtuaHost virtuaHost;// 指定一个线程池, 负责执行具体的回调函数private ExecutorService workPool = Executors.newFixedThreadPool(4);// 存放令牌的队列 - 阻塞队列private BlockingDeque<String> tokenQueue = new LinkedBlockingDeque<>();// 扫描线程private Thread scannerThread = null;//public ConsumerManager(VirtuaHost virtuaHost) {this.virtuaHost = virtuaHost;scannerThread =new Thread(()->{while (true){try {String queueName = tokenQueue.take();MSGQueue queue = virtuaHost.getMemoryDataCenter().getQueue(queueName);if (queue == null){throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);}synchronized (queue){consumeMessage(queue);}} catch (InterruptedException e) {e.printStackTrace();} catch (MqException e) {e.printStackTrace();}}});scannerThread.setDaemon(true);scannerThread.start();}// 往阻塞队列中添加消息public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}// 增加订阅public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 先找到对应的队列MSGQueue queue = virtuaHost.getMemoryDataCenter().getQueue(queueName);if (queue == null){throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumEnv consumEnv = new ConsumEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue){queue.addConsumEnv(consumEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = virtuaHost.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}}private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumEnv luckyDog = queue.chooseConsumEnv();if (luckyDog == null){// 说明没有消费者return;}// 2. 从队列中取出一个消息Message message = virtuaHost.getMemoryDataCenter().pollMessage(queue.getName());if (message == null){// 说明没有消息,不能消费return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workPool.submit(()->{try {//1,将消息放到待确认的集合中, 这个操作在回调函数之前virtuaHost.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);//2. 执行回调函数luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());System.out.println("[ConsumerManager] 消息被成功消费, queueName = "+queue.getName());//3. 如果是自动应答, 就可以之间删除消息// 如果是手动应答, 就先什么也不做if (luckyDog.isAutoAck()){// 1删除硬盘上的消息if (message.getDeliverMode() == 2){virtuaHost.getDiskDataCenter().deleteMessage(queue,message);}//2 删除待确认的消息virtuaHost.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3 删除内存中的消息virtuaHost.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (IOException | ClassNotFoundException | MqException e) {e.printStackTrace();}});}}