文章目录
- 一、虚拟机
- 二、关于消息的API
- 发布消息
- 直接交换机 DIRECT 转发规则
- 扇出交换机 FANOUT 转发规则
- 主题交换机 TOPIC 转发规则
- 匹配规则
- Router类
- 订阅消息
- 消费者
- 队列如何给订阅的消费者发送消息
- 自动发送消息至订阅者
- 应答消息
- 三、代码编写
一、虚拟机
接下来要创建虚拟机,每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息 数据.同时提供 api 供上层调用.
在这里咱们实现的单虚拟机,并没有提供创建虚拟机,销毁虚拟机的功能,但是为了方便后续的扩展,咱们要约定好如何区分多个虚拟机之间的交换机,队列,绑定关系.
不同虚拟机当然可以拥有相同名字的交换机等.
比如
虚拟机A中 拥有 交换机C,
虚拟机B中 拥有 交换机C,
咱们视为以上情况是合法的.
咱们这里采取的方案是,在客户提供的交换机等的身份标识(交换机名字),前加上虚拟机的名字.
即 客户要在虚拟机 VirtualHostA 中创建交换机 exchangeC,咱们服务器存储的交换机名字是 VirtualHostAexchangeC.
当然也有其他方案,大家可以自由发挥.
二、关于消息的API
这是虚拟机要提供给上层的API
前6个API咱们已经写好了,只需要直接调用下层的API即可.
咱们现在来考虑后 7 - 9 这三个API的实现.
发布消息
发布消息API:其实就是生产者将消息发送给对应的交换机,交换机再根据不同的转发规则,转发给与之相绑定且符合规则的消息队列.
绑定关系 Binding 中有一个 bindingKey 属性
消息 Message 中 有一个 routingKey 属性
下面就来讲解一下三种交换机的转发规则已经这两个 Key 的不同含义.
直接交换机 DIRECT 转发规则
在直接交换机中,
bindingKye是无意义的,
routingKey是要转发到的队列的队列名.
直接交换机的转发规则, 是无视 bindingKey的,即 直接交换机是否与这个队列绑定都没有关系,而直接将消息转发到 routingKey指定的队列名的队列中.
扇出交换机 FANOUT 转发规则
在扇出交换机中,
bindingKye是无意义的,
routingKey是无意义的.
扇出交换机的转发规则,是将收到的消息转发到与之绑定的所有队列中.与bindingKye和routingKey是没有任何关系的.
主题交换机 TOPIC 转发规则
在主题交换机中,
bindingKey是创建绑定时,给绑定指定的特殊字符串(相当于一把锁),
routingKey是转发消息时,给消息指定的特殊字符串(相当于一把钥匙).
主题交换机的转发规则,是将收到的消息的routingKey与绑定的所有队列中的 bindingKey 进行匹配,当且仅当匹配成功时,才将消息转发给该队列.
匹配规则
routingKey规则
- 由数字,字母,下划线组成
- 使用 . 将routingKey分成多个部分.
bindingKey规则
- 由数字,字母,下划线组成
- 使用 . 将routingKey分成多个部分.
- 支持两种特殊的符号作为通配符 * 与 # (*和#必须是作为被 . 分割出来的单独部分如 aaa*.bb就是非法的)
* 可以匹配任何一个独立的部分
# 可以匹配0个或多个的独立部分
匹配规则
Router类
在core包中创建 Router类,来完成对bindingKey与routingKey的校验与匹配.
/*** 这个类用来检查 bindingKey与routingKey 是否合法* 以及 bindingKey与routingKey 的匹配功能,* 以及 根据不同交换机的转发规则判断 消息 Message 是否可以转发给对应的绑定队列*/
public class Router {// bindingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分// 3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.// 检查 BindingKey 是否合法public boolean checkBindingKey(String bindingKey) {if (bindingKey.length() == 0) {// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.return true;}// 检查字符串中不能存在非法字符for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if (ch >= 'A' && ch <= 'Z') {continue;}if (ch >= 'a' && ch <= 'z') {continue;}if (ch >= '0' && ch <= '9') {continue;}if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {continue;}return false;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况; aaa.a*.bbb 非法情况.String[] words = bindingKey.split("\\.");for (String word : words) {// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {return false;}}// 约定一下, 通配符之间的相邻关系(人为约定的).// 这样约定是因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性 提升不大~~// 1. aaa.#.#.bbb => 非法// 2. aaa.#.*.bbb => 非法// 3. aaa.*.#.bbb => 非法// 4. aaa.*.*.bbb => 合法for (int i = 0; i < words.length - 1; i++) {// 连续两个 ##if (words[i].equals("#") && words[i + 1].equals("#")) {return false;}// # 连着 *if (words[i].equals("#") && words[i + 1].equals("*")) {return false;}// * 连着 #if (words[i].equals("*") && words[i + 1].equals("#")) {return false;}}return true;}// 检查 RoutingKey 是否合法// routingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分public boolean checkRoutingKey(String routingKey) {if (routingKey.length() == 0) {// 空字符串. 合法的情况. 比如在使用 fanout 交换机的时候, routingKey 用不上, 就可以设为 ""return true;}for (int i = 0; i < routingKey.length(); i++) {char ch = routingKey.charAt(i);// 判定该字符是否是大写字母if (ch >= 'A' && ch <= 'Z') {continue;}// 判定该字母是否是小写字母if (ch >= 'a' && ch <= 'z') {continue;}// 判定该字母是否是阿拉伯数字if (ch >= '0' && ch <= '9') {continue;}// 判定是否是 _ 或者 .if (ch == '_' || ch == '.') {continue;}// 该字符, 不是上述任何一种合法情况, 就直接返回 falsereturn false;}// 把每个字符都检查过, 没有遇到非法情况. 此时直接返回 truereturn true;}// 用来判断该消息是否可以转发给这个绑定对应的队列public boolean route(ExchangeType exchangeType,Binding binding,Message message) throws MqException {// 根据不同的 exchangeType 使用不同的转发规则if (exchangeType == ExchangeType.FANOUT) {// 如果是 FANOUT 类型,则该交换机上绑定的所有队列都需要转发return true;} else if (exchangeType == ExchangeType.TOPIC) {return routeTopic(binding,message);} else {throw new MqException("[Router] 交换机类型非法! exchangeType="+exchangeType);}}// 用来匹配 bindingKey与routingKeyprivate boolean routeTopic(Binding binding, Message message) {// 先把这两个 key 进行切分String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");// 引入两个下标, 指向上述两个数组. 初始情况下都为 0int bindingIndex = 0;int routingIndex = 0;// 此处使用 while 更合适, 每次循环, 下标不一定就是 + 1, 不适合使用 forwhile (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {if (bindingTokens[bindingIndex].equals("*")) {// [情况二] 如果遇到 * , 直接进入下一轮. * 可以匹配到任意一个部分!!bindingIndex++;routingIndex++;continue;} else if (bindingTokens[bindingIndex].equals("#")) {// 如果遇到 #, 需要先看看有没有下一个位置.bindingIndex++;if (bindingIndex == bindingTokens.length) {// [情况三] 该 # 后面没东西了, 说明此时一定能匹配成功了!return true;}// [情况四] # 后面还有东西, 拿着这个内容, 去 routingKey 中往后找, 找到对应的位置.// findNextMatch 这个方法用来查找该部分在 routingKey 的位置. 返回该下标. 没找到, 就返回 -1routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);if (routingIndex == -1) {// 没找到匹配的结果. 匹配失败return false;}// 找到的匹配的情况, 继续往后匹配.bindingIndex++;routingIndex++;} else {// [情况一] 如果遇到普通字符串, 要求两边的内容是一样的.if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {return false;}bindingIndex++;routingIndex++;}}// [情况五] 判定是否是双方同时到达末尾// 比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失败的.if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {return true;}return false;}private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {for (int i = routingIndex; i < routingTokens.length; i++) {if (routingTokens[i].equals(bindingToken)) {return i;}}return -1;}
}
订阅消息
消费者
咱们要创建一个消费者类,其中有消费者的信息,以及该消费者订阅的队列的名字,
以及消息应答模式,以及回调函数.
回调函数是什么?
是让消费者自己设定一个函数,当有新的消息转发给该消费者后,就执行这个回调函数
/*** 消费者的回调函数*/
@FunctionalInterface
public interface Consumer {// Delivery 是 “投递” 的意思,这个方法预期是在每次服务器收到消息之后,来调用// 通过这个方法把消息推送给对应的消费者// 这里的方法名与参数列表都是参考 RabbitMQ 的void handleDelivery(String consumerTag, BasicProperties basicProperties,byte[] body) throws MqException, IOException;
}
/*** 表示一个消费者(完整的执行环境)*/
@Data
public class ConsumerEnv {// 消费者信息private String consumerTag;// 订阅队列的名字private String queueName;// true -> 自动应答, false -> 手动应答private boolean autoAck;// 回调函数private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag = consumerTag;this.queueName = queueName;this.autoAck = autoAck;this.consumer = consumer;}
}
队列如何给订阅的消费者发送消息
这里咱们要想清楚的是,一个队列可以有多个消费者,
新来的消息要转发给哪个消费者呢?
咱们在这里采取轮询策略,即让消费者排队,依次将消息发送给消费者,当消费者收到消息后,则移动到队伍的最后等待下个消息.
因此咱们要给核心类 Message类再增加几个属性和方法,来管理消费者,
/*** 表示一个存储消息的队列* MSG =》Message* 消息队列的使用者是消费者*/
@Data
public class MSGQueue {// 表示队列的身份标识private String name;// 表示队列是否持久化private boolean durable = false;// true -> 这个队列只能被一个消费者使用,false -> 大家都能使用这个队列// 后续代码不实现相关功能private boolean exclusive = false;// true -> 没人使用后,自动删除,false -> 没人使用,不自动删除private boolean autoDelete = false;// 表示扩展参数,后续代码没有实现private Map<String,Object> arguments = new HashMap<>();// 当前队列有哪些消费者订阅了private List<ConsumerEnv> consumerEnvList = new ArrayList<>();// 记录当前取到了第几个消费者,方便实现轮询策略private AtomicInteger consumerSeq = new AtomicInteger(0);// 添加一个新的订阅者public void addConsumerEnv(ConsumerEnv consumerEnv) {consumerEnvList.add(consumerEnv);}// 删除订阅者暂不考虑// 挑选一个订阅者,用来处理当前的消息(按照轮询的方式)public ConsumerEnv chooseConsumer() {// 无人订阅if (consumerEnvList.size() == 0) {return null;}int index = consumerSeq.get() % consumerEnvList.size();consumerSeq.getAndIncrement();return consumerEnvList.get(index);}public String getArguments() {ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return null;}public void setArguments(String arguments) {ObjectMapper objectMapper = new ObjectMapper();try {this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String,Object>>() {});} catch (JsonProcessingException e) {e.printStackTrace();}}public void setArguments(Map<String,Object> arguments) {this.arguments = arguments;}public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key,Object value) {arguments.put(key, value);}
}
自动发送消息至订阅者
那么消费者要如何拿到消息呢?即如何将消息发送给消费者,咱们这里采取的是自动发送,即队列中来了新消息,就自动将新消息发送给订阅了这个队列的消费者.
咱们实现的方法是,使用一个阻塞队列,当生产者发布消息到交换机时,交换机转发消息到对应的队列后,就把队列名当作令牌添加到这个阻塞队列中,再配置一个扫描线程,去时刻扫描这个阻塞队列中是否有新的令牌了,有了新令牌,则根据令牌去对应的队列中,去把新消息安装轮询策略转发给消费者.
应答消息
应答消息共有两种模式.
- 自动应答:将消息发送给消费者就算应答了(不关心消费者收没收到,相当于没应答)
- 手动应答:需要消费者手动调用应答方法(确保消费者收到消息了)
三、代码编写
/*** 通过这个类, 来表示 虚拟主机.* 每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息 数据.* 同时提供 api 供上层调用.* 针对 VirtualHost 这个类, 作为业务逻辑的整合者, 就需要对于代码中抛出的异常进行处理了.*/@Data
public class VirtualHost {private String virtualHostName;private Router router = new Router();private DiskDataCenter diskDataCenter = new DiskDataCenter();private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();// 操作交换机的锁对象,防止多线程操作交换机时,出现线程安全问题,如 创建了两个拥有相同身份标识的交换机private final Object exchangeLocker = new Object();// 操作队列的锁对象,防止多线程,操作队列时,出现线程安全问题,如 创建了两个拥有相同身份标识的队列private final Object queueLocker = new Object();// 消费者管理中心ConsumerManager consumerManager = new ConsumerManager(this);public VirtualHost(String virtualHostName) {this.virtualHostName = virtualHostName;// MemoryDataCenter 并不需要初始化,当 new MemoryDataCenter();时,里面所需的数据结构也都已经创建好了// DiskDataCenter 就需要初始化操作,去建库建表建文件和初始化数据的设定diskDataCenter.init();try {// 将硬盘中已有的数据恢复到内存中memoryDataCenter.recovery(diskDataCenter);} catch (IOException | MqException | ClassNotFoundException e) {e.printStackTrace();}}// 创建交换机// 如果交换机不存在, 就创建. 如果存在, 直接返回.// 返回值是 boolean. 创建成功, 返回 true. 失败返回 falsepublic boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String,Object> arguments) {exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {// 把交换机的名字, 加上虚拟主机作为前缀.// 1.判断交换机是否存在,直接从内存查询if (memoryDataCenter.getExchange(exchangeName) != null) {// 该交换机已经存在!System.out.println("[VirtualHost] 交换机已经存在! exchangeName=" + exchangeName);return true;}// 2.创建交换机Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);// 3.把交换机对象写入硬盘if (durable) {diskDataCenter.insertExchange(exchange);}// 4.把交换机对象写入内存memoryDataCenter.insertExchange(exchange);System.out.println("[VirtualHost] 交换机创建成功! exchangeName=" + exchangeName);// 上述逻辑, 先写硬盘, 后写内存. 目的就是因为硬盘更容易写失败. 如果硬盘写失败了, 内存就不写了.// 要是先写内存, 内存写成功了, 硬盘写失败了, 还需要把内存的数据给再删掉. 就比较麻烦了.}return true;} catch (Exception e) {System.out.println("[VirtualHost] 交换机创建失败! exchangeName=" + exchangeName);e.printStackTrace();return false;}}// 删除交换机public boolean exchangeDelete(String exchangeName) {exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {// 1.判断交换机是否存在Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);if (existsExchange == null) {throw new MqException("[virtualHostName] 交换机不存在无法删除!");}if (existsExchange.isDurable()) {// 2.删除硬盘上的数据diskDataCenter.deleteExchange(exchangeName);}// 3.删除内存上的数据memoryDataCenter.deleteExchange(exchangeName);System.out.println("[VirtualHost] 交换机删除成功! exchangeName=" + exchangeName);}return true;}catch (Exception e) {System.out.println("[VirtualHost] 交换机删除失败! exchangeName=" + exchangeName);e.printStackTrace();return false;}}// 创建队列public boolean queueuDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments) {// 给队列名字加前缀queueName = virtualHostName + queueName;try {synchronized (queueLocker) {// 1.判断队列是否存在if (memoryDataCenter.getQueue(queueName) != null) {System.out.println("[VirtualHost] 队列已经存在! queueName=" + queueName);return true;}// 2.创建队列MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);// 3.向硬盘中写入数据if (durable) {diskDataCenter.insertQueue(queue);}// 4.向内存中写入数据memoryDataCenter.insertQueue(queue);System.out.println("[VirtualHost] 队列创建成功! queueName=" + queueName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 队列创建失败! queueName=" + queueName);e.printStackTrace();return false;}}// 删除队列public boolean queueDelete(String queueeName) {queueeName = virtualHostName + queueeName;try {synchronized (queueLocker) {MSGQueue existsQueue = memoryDataCenter.getQueue(queueeName);// 1.判断交换机是否存在if (existsQueue == null) {throw new MqException("[virtualHostName] 队列不存在无法删除!");}// 2.删除硬盘上的数据if (existsQueue.isDurable()) {diskDataCenter.deleteQueue(queueeName);}// 3.删除内存上的数据memoryDataCenter.deleteQueue(queueeName);System.out.println("[VirtualHost] 队列删除成功! queueeName=" + queueeName);}return true;}catch (Exception e) {System.out.println("[VirtualHost] 队列删除失败! queueeName=" + queueeName);e.printStackTrace();return false;}}// 创建绑定public boolean bindingDeclare(String exchangeName,String queueName,String bindingKey) {exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1.判断绑定是否已存在,对应的交换机 队列是否存在,bindingKey是否合法if (memoryDataCenter.getBinding(exchangeName, queueName) != null) {return true;}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}if (!router.checkBindingKey(bindingKey)) {throw new MqException("[VirtualHost] bindingKey 非法! bindingKey=" + bindingKey);}// 2.创建绑定Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);// 3.写入硬盘if (exchange.isDurable() && queue.isDurable()) {diskDataCenter.insertBinding(binding);}// 4.写入内存memoryDataCenter.insertBinding(binding);System.out.println("[VirtualHost] 绑定创建成功! exchangeName=" + exchangeName+ ", queueName=" + queueName);}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 绑定创建失败! exchangeName=" + exchangeName+ ", queueName=" + queueName);e.printStackTrace();return false;}}// 删除绑定public boolean bindingDelete(String exchangeName,String queueName) {exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1.判断绑定是否已存在,对应的交换机 队列是否存在,bindingKey是否合法Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if (binding == null) {throw new MqException("[VirtualHost] 删除绑定失败! 绑定不存在! exchangeName=" + exchangeName + ", queueName=" + queueName);}// 2. 无论绑定是否持久化了, 都尝试从硬盘删一下. 就算不存在, 这个删除也无副作用.diskDataCenter.deleteBinding(binding);// 3. 删除内存的数据memoryDataCenter.deleteBinding(binding);System.out.println("[VirtualHost] 删除绑定成功!");}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 删除绑定失败!");e.printStackTrace();return false;}}// 发送消息到指定的交换机/队列中public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {exchangeName = virtualHostName + exchangeName;// 1.检查 routingKey是否合法if (!router.checkRoutingKey(routingKey)) {throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);}// 2.查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}// 3.判断交换机绑定类型if (exchange.getType() == ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.// 此时, 可以无视绑定关系.// 4.查找对应的队列String queueName = virtualHostName + routingKey;MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}// 5.构造 messageMessage message = Message.createMessageWithId(routingKey, basicProperties, body);// 6.发送消息sendMessage(queue,message);return true;} else {// 按照 fanout 和 topic 的方式来转发.// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBinding(exchangeName);for (Map.Entry<String,Binding> entry : bindingsMap.entrySet()) {// 6.获取绑定对象,判断对应的队列是否存在Binding binding = entry.getValue();MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if (queue == null){// 此处咱们就不抛出异常了. 可能此处有多个这样的队列.// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());continue;}// 7.构造消息Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 8.判断该消息是否可以发送给该队列if (router.route(exchange.getType(),binding,message)) {sendMessage(queue,message);}}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 消息发送失败!");e.printStackTrace();return false;}}// 发送消息到硬盘与内存private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息,即 把消息写入到 硬盘 和 内存 中int deliverMode = message.getDeliverMode();// deliverMode 为 1 不持久化,为 2 持久化if (deliverMode == 2) {diskDataCenter.sendMessage(queue,message);}// 写入内存memoryDataCenter.sendMessage(queue,message);// 通知消费者可以消费消息了consumerManager.notifyConsume(queue.getName());}// 订阅消息// 添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后,应答的方式 为 true 自动应答,为 false 手动应答// consumer: 是一个回调函数,此处类型设定成函数式接口,这样后续调用 basicConsume 并且传实参的时候,就可以写作 lambda样子public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);System.out.println("[VirtualHost] basicConsume成功 queueName=" + queueName);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicConsume失败 queueName=" + queueName);e.printStackTrace();return false;}}// 消息的手动应答public boolean basicAck(String queueName,String messageId) {queueName = virtualHostName + queueName;try {// 1.获取消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null) {throw new MqException("[VirtualHost] 要确认的消息不存在! messageId=" + messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 要确认的队列不存在! queueName=" + queueName);}// 2.删除硬盘上的消息if (message.getDeliverMode() == 2) {diskDataCenter.deleteMessage(queue, message);}// 3.删除消息中心的消息memoryDataCenter.removeMessage(messageId);// 4.删除待确认消息中的消息memoryDataCenter.removeMessageWaitAck(queueName, messageId);System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName=" + queueName + ", messageId="+ messageId);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicAck 失败! 消息被成功确认! queueName=" + queueName + ", messageId="+ messageId);e.printStackTrace();return false;}}}