模拟实现消息队列项目(系列5) -- 服务器模块(虚拟主机)

目录

前言

1. 创建VirtualHost

1.1 定义虚拟主机的相关属性

1.2 VirtualHost 构造方法 

1.3 交换机和队列的创建和删除

1.3.1 交换机操作

1.3.2 队列操作 

1.4 绑定的创建和删除

1.5 发送消息到指定的队列/交换机

2. 实现路由规则Router

2.1 checkBindingKey()

2.2 checkRoutingKey()

2.3 route()

2.4 单元测试

3. 订阅消息

3.1 添加一个订阅者

3.2 创建订阅者管理类ConsumerManager

3.3 订阅消息小结

4. 消息确认 basicAck()

5. VirtualHost单元测试

结语


前言

        写到这里,内存和硬盘的数据就组织完毕了,接下来我们就会引入在消息队列初识中提出的一个概念 --- 虚拟主机.简单回顾一下虚拟主机的概念: 它类似于MySQL的database,是一个逻辑的集合,一个BrokerServer上可以存在多个VirtualHost.在一个BrokerServer上可以组织不同的数据,可以使用不同的虚拟主机做出逻辑上的区分.本章节就是进行进一步的封装,同时实现一些消息队列的API.这里需要注意的是在RabbitMq中,虚拟主机是可以随便创建和删除的,在本项目目前只是默认只有一个虚拟主机的存在,后续根据情况会进行扩展,这里也提前预留了对于多虚拟主机的管理的数据结构.保证了不同虚拟机中的交换机 队列 绑定 消息都是相互隔离的.本项目全部代码已上传Gitee,链接放在文章末尾,欢迎大家访问!


1. 创建VirtualHost

👇👇👇

注意: 这一块比较重要也比较复杂,所以将代码进行截图加标注的形式进行总结,完整的VirtualHost.class代码会在讲解完给出.

👆👆👆

1.1 定义虚拟主机的相关属性

Router: 是用来定义交换机转发的规则,主要实现的是对routingKey进行验证以及判断,具体的细节会在后面给出.

ConsumerManager: 实现的是管理消费者进行消费.

以上两者就是锁对象了,后续我们要对硬盘和内存进行数据的读写,为了保证操作的原子性,以及线程安全我们会给相关操作进行加锁. 

1.2 VirtualHost 构造方法 

主要就是传入虚拟主机的名字,对该虚拟主机的数据库以及文件信息进行初始化,主要是对数据库进行初始化.具体DataBaseManager.init()

初始化内容如下:

 初始化完成,将硬盘中的数据恢复到内存中

至此前置工作就差不多了.下面对一些重要的方法进行创建.

1.3 交换机和队列的创建和删除

1.3.1 交换机操作

如果交换机不存在就进行创建,存在就直接返回(ExchangeDeclare)

  • 1. 更改交换机的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字(更加方便后续的管理)
  • 2. 判断交换机是否存在: 存在直接返回true即可,不存在就直接创建新的交换机即可.设置交换机的属性,根据是否持久化写入到硬盘,然后在写入到内存.这里需要注意的是,我们一定要先写硬盘再写内存,因为些硬盘是一个失败率很高的事情,经常会因为文件权限问题导致数据写入不进去.如果先写内存,而硬盘写入不进去,就还需要堆内存的数据进行删除,这就很繁琐了.
  • 3. 以上整个操作是对交换机进行读写操作,为了保证线程安全,我们进行加锁操作.
/*** 1. 创建交换机* 如果交换机不存在就进行创建,存在就直接返回*/// 创建交换机// 如果交换机不存在, 就创建. 如果存在, 直接返回.// 返回值是 boolean. 创建成功, 返回 true. 失败返回 falsepublic boolean exchangeDeclare(String exchangeName,ExchangeType exchangeType,boolean durable,boolean autoDelete,Map<String, Object> arguments) {// 1. 更改交换机的名字 交换机的名字 = 虚拟主机 + 交换机exchangeName = virtualHostName + exchangeName;try{synchronized (exchangeLocker){// 2. 判定该交换机是否存在Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);if (existsExchange != null){System.out.println("[VirtualHost] 交换机已经存在!");return true;}// 3. 不存在,直接进行创建新的交换机Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);// 4. 将构造好的交换机进行写入硬盘(含有持久化信息的交换机)  先写硬盘后写内存if (durable){diskDataCenter.insertExchange(exchange);}// 5. 将交换机写入到内存中memoryDataCenter.insertExchange(exchange);System.out.println("[VirtualHost] 交换机创建完成! exchangeName="+exchangeName);// 上述操作为什么不先写内存后写硬盘?// 因为写硬盘操作比较容易出现异常,如果写入硬盘失败,写入内存成功,再进行从内存中进行删除就比较麻烦了}return true;} catch (Exception e){System.out.println("[VirtualHost] 交换机创建失败! exchangeName="+exchangeName);e.printStackTrace();return false;}}

删除交换机

  • 1. 更改交换机的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字(更加方便后续的管理)
  • 2. 根据交换机的名字得到交换机对象,判断交换机是否为空,不为空进行删除操作,还是先进行删除硬盘的数据,再删除内存中数据
  • 3. 以上整个操作是对交换机进行读写操作,为了保证线程安全,我们进行加锁操作.
/*** 2.删除交换机* @param exchangeName 交换机名字* @return*/public boolean exchangeDelete(String exchangeName) {exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {// 1. 先找到对应的交换机.Exchange toDelete = memoryDataCenter.getExchange(exchangeName);if (toDelete == null) {throw new MqException("[VirtualHost] 交换机不存在无法删除!");}// 2. 删除硬盘上的数据if (toDelete.isDurable()) {diskDataCenter.deleteExchange(exchangeName);}// 3. 删除内存中的交换机数据memoryDataCenter.deleteExchange(exchangeName);System.out.println("[VirtualHost] 交换机删除成功! exchangeName=" + exchangeName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 交换机删除失败! exchangeName=" + exchangeName);e.printStackTrace();return false;}}

1.3.2 队列操作 

针对队列创建和删除操作,这里就不做过多的解释了,过程跟上述交换机的操作一样. 下面给出代码:

/*** 3. 创建队列* @param queueName 队列名* @param durable 持久化* @param exclusive 队列独有* @param autoDelete 自动删除* @param arguments 其他声明* @return*/public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) {// 把队列的名字, 给拼接上虚拟主机的名字.queueName = virtualHostName + queueName;try {synchronized (queueLocker) {// 1. 判定队列是否存在MSQueue existsQueue = memoryDataCenter.getQueue(queueName);if (existsQueue != null) {System.out.println("[VirtualHost] 队列已经存在! queueName=" + queueName);return true;}// 2. 创建队列对象MSQueue queue = new MSQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);// 3. 写硬盘if (durable) {diskDataCenter.insertQueue(queue);}// 4. 写内存memoryDataCenter.insertQueue(queue);System.out.println("[VirtualHost] 队列创建成功! queueName=" + queueName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 队列创建失败! queueName=" + queueName);e.printStackTrace();return false;}}/*** 4. 删除队列* @param queueName 队列名* @return*/public boolean queueDelete(String queueName) {queueName = virtualHostName + queueName;try {synchronized (queueLocker) {// 1. 根据队列名字, 查询下当前的队列对象MSQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! 无法删除! queueName=" + queueName);}// 2. 删除硬盘数据if (queue.isDurable()) {diskDataCenter.deleteQueue(queueName);}// 3. 删除内存数据memoryDataCenter.deleteQueue(queueName);System.out.println("[VirtualHost] 删除队列成功! queueName=" + queueName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 删除队列失败! queueName=" + queueName);e.printStackTrace();return false;}}

1.4 绑定的创建和删除

  • 1. 更改交换机和队列的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字  队列名字 = 虚拟主机的名字 + 队列的名字
  • 2. 根据交换机和队列的名字得到绑定信息的对象,判断绑定是否为空,不为空抛出异常
  • 3. 绑定对象为空: 1, 判断绑定的bindingKey是否合法. 2.合法就创建绑定对象,设置响应的绑定属性.
  • 4. 获取一下对应的交换机和队列. 如果交换机或者队列不存在, 这样的绑定也是无法创建的.
  • 5. 写入硬盘,再写内存
  • 6. 以上整个操作是对交换机和队列进行读写操作,为了保证线程安全,我们进行加锁操作.

 这一步我们在Router进行设置一个方法,等下面更加详细的介绍router类.

/*** 5. 创建绑定* @param queueName 队列名字* @param exchangeName 交换机名字* @param bindingKey 绑定规则* @return*/public boolean queueBind(String queueName, String exchangeName, String bindingKey) {// 1. 转换交换机和队列的名字queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker){synchronized (queueLocker){// 2. 判断交换机和队列是否已经绑定成功Binding existBinding = memoryDataCenter.getBinding(exchangeName,queueName);if (existBinding != null){throw new MqException("[VirtualHost] binding 已经存在! queueName=" + queueName+ ", exchangeName=" + exchangeName);}// 3. 验证bing中的bindingKey 是否合法if (!router.checkBindingKey(bindingKey)){throw new MqException("[VirtualHost] bindingKey 非法! bindingKey=" + bindingKey);}// 4. 创建绑定对象Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);// 5. 获取对应的交换机和队列,判断是否是存在的MSQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}// 5. 先写硬盘if (queue.isDurable() && exchange.isDurable()) {diskDataCenter.insertBinding(binding);}// 6. 写入内存memoryDataCenter.insertBinding(binding);System.out.println("[VirtualHost] 绑定创建成功! exchangeName=" + exchangeName+ ", queueName=" + queueName);}return true;}} catch (MqException e) {System.out.println("[VirtualHost] 绑定创建失败! exchangeName=" + exchangeName+ ", queueName=" + queueName);e.printStackTrace();return false;}}

删除绑定 

  • 1. 更改交换机和队列的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字  队列名字 = 虚拟主机的名字 + 队列的名字
  • 2. 根据交换机和队列的名字得到绑定信息的对象,判断绑定是否为空,为空抛出异常
  • 3. 从硬盘进行删除,从内存进行删除
  • 4. 以上整个操作是对交换机和队列进行读写操作,为了保证线程安全,我们进行加锁操作.这里需要注意的是,我们对交换机和队列进行加锁的时候,顺序要和创建绑定的顺序是一致的.不然会出现死锁的现象.
/*** 6. 删除绑定* @param queueName 队列名* @param exchangeName 交换机名字* @return*/public boolean queueUnbind(String queueName, String exchangeName) {queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1. 获取 binding 看是否已经存在~Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if (binding == null) {throw new MqException("[VirtualHost] 删除绑定失败! 绑定不存在! exchangeName=" + exchangeName + ", queueName=" + queueName);}// 2. 无论绑定是否持久化了, 都尝试从硬盘删一下. 就算不存在, 这个删除也无副作用.diskDataCenter.deleteBinding(binding);// 3. 删除内存的数据memoryDataCenter.deleteBinding(binding);System.out.println("[VirtualHost] 删除绑定成功!");}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 删除绑定失败!");e.printStackTrace();return false;}}

1.5 发送消息到指定的队列/交换机

发布消息其实就是把消息发送到指定的交换机中,然后根据绑定关系发送到指定的队列

  • 1. 更改交换机和队列的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字  队列名字 = 虚拟主机的名字 + 队列的名字 
  • 2. 检查消息的routingKey是否合法,不合法抛出异常
  • 3. 根据传入的交换机的名字进行查找交换机对象,然后判断交换机的类型,而进行下一步的行为.
  • 4. 如果交换机类型为DIRECT,则表示为直接交换机,则把routingKey作为队列的名字,先进行根据传入的参数,创建消息对象,然后按照刚才组合好的队列名字进行查找队列,查找队列进行发送消息,没查找进行抛出异常.发送消息的时候判断消息是否是持久化的,是持久化就往硬盘中写入,否则只写内存就可以.发送完消息之后,要进行重要的操作.通知消费者进行消费消息.这一块是在管理消费者进行消费消息实现的.
  • 5. 如果交换机类型为Fanout 或者 Topic 我们需要在Router中进行设置相应的路由规则.
/*** 9. 发送消息到指定的队列/交换机*/public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {// 1. 转换交换机的名字exchangeName = virtualHostName + exchangeName;// 2. 检查 routingKey 是否合法.if (!router.checkRoutingKey(routingKey)) {throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);}// 3. 查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}// 4. 判定交换机的类型if (exchange.getType() == ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.// 此时, 可以无视绑定关系.String queueName = virtualHostName + routingKey;// 5. 构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 6. 查找该队列名对应的对象MSQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}// 7. 队列存在, 直接给队列中写入消息sendMessage(queue, message);} else {// 按照 fanout 和 topic 的方式来转发.// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {// 1) 获取到绑定对象, 判定对应的队列是否存在Binding binding = entry.getValue();MSQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if (queue == null) {// 此处咱们就不抛出异常了. 可能此处有多个这样的队列.// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());continue;}// 2) 构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 3) 判定这个消息是否能转发给该队列.//    如果是 fanout, 所有绑定的队列都要转发的.//    如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配.if (!router.route(exchange.getType(), binding, message)) {continue;}// 4) 真正转发消息给队列sendMessage(queue, message);}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 消息发送失败!");e.printStackTrace();return false;}}private void sendMessage(MSQueue queue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息, 就是把消息写入到 硬盘 和 内存 上. 根据此条消息时是否要进行持久化进行判断int deliverMode = message.getDeliverMode();// deliverMode 为 1 , 不持久化. deliverMode 为 2 表示持久化.if (deliverMode == 2) {diskDataCenter.sendMessage(queue, message);}// 写入内存memoryDataCenter.sendMessage(queue, message);// 此处还需要补充一个逻辑, 通知消费者可以消费消息了.consumerManager.notifyConsume(queue.getName());}

2. 实现路由规则Router

 这个类我们实现具体的路由转发规则,对之前还没实现的方法进行实现.还未实现的方法具体如下:

1. 在创建绑定的时候我们对bindingKey进行验证是否合法checkBindingKey();

2. 在往交换机进行发送消息的时候,我们对消息的routingKey进行验证\checkRoutingKey();

3. 当消息插入到交换机之后,根据交换机的主题往队列中分发消息的时候.对不同主题的交换机实现不同的路由规则route();

以上是我们在虚拟主机类中还没有进行实现的方法.下面进行一一实现:

2.1 checkBindingKey()

 以下是我们合法的BindingKey的规则

/*** 验证bindingKey是否是合法的*     1. 数字, 字母, 下划线*     2. 使用 . 分割成若干部分*     3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.* @return*/public boolean checkBindingKey(String bindingKey){if (bindingKey.length() == 0) {// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.return true;}// 检查字符串中不能存在非法字符for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if (ch >= 'A' && ch <= 'Z') {continue;}if (ch >= 'a' && ch <= 'z') {continue;}if (ch >= '0' && ch <= '9') {continue;}if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {continue;}return false;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况;  aaa.a*.bbb 非法情况.String[] words = bindingKey.split("\\.");for (String word : words) {// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {return false;}}// 约定一下, 通配符之间的相邻关系(人为约定的).// 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大~~// 1. aaa.#.#.bbb    => 非法// 2. aaa.#.*.bbb    => 非法// 3. aaa.*.#.bbb    => 非法// 4. aaa.*.*.bbb    => 合法for (int i = 0; i < words.length - 1; i++) {// 连续两个 ##if (words[i].equals("#") && words[i + 1].equals("#")) {return false;}// # 连着 *if (words[i].equals("#") && words[i + 1].equals("*")) {return false;}// * 连着 #if (words[i].equals("*") && words[i + 1].equals("#")) {return false;}}return true;}

2.2 checkRoutingKey()

验证routingKey是合法的.routingKey是与BindingKey进行匹配的,所以必须是具体的.        

 

/*** 验证routingKey是否是合法的*      1. 数字, 字母, 下划线*      2. 使用 . 分割成若干部分* @return*/public boolean checkRoutingKey(String routingKey){if (routingKey.length() == 0){// 空字符串,合法的情况  当交换机的类型为fanout的时候,是不需要的,所以可以设置为""return true;}for (int i = 0; i < routingKey.length(); i++) {char ch = routingKey.charAt(i);// 判定该字符是否是大写字母if (ch >= 'A' && ch <= 'Z') {continue;}// 判定该字母是否是小写字母if (ch >= 'a' && ch <= 'z') {continue;}// 判定该字母是否是阿拉伯数字if (ch >= '0' && ch <= '9') {continue;}// 判定是否是 _ 或者 .if (ch == '_' || ch == '.') {continue;}// 该字符, 不是上述任何一种合法情况, 就直接返回 falsereturn false;}// 把每个字符都检查过, 没有遇到非法情况. 此时直接返回 truereturn true;}

2.3 route()

判断交换机的类型进而得出是否可以进行给队列进行转发消息.

1. 交换机的类型为fanout.代表给交换机进行绑定的所有队列进行转发消息.

2. 交换机的类型为Topic,需要对routingKey进行判断.进而设置给队列转发消息

/***  判断是否可以给绑定的交换机进行转发消息* @return*/public boolean route(ExchangeType type, Binding binding, Message message) throws MqException {if (type == ExchangeType.FANOUT){// 如果交换机类型为 fan-out 就直接进行返回true,表示转发给当前当前绑定的所有对列return true;}else if(type == ExchangeType.TOPIC){// 如果是主题交换机,规则就比较复杂return routerTopic(binding,message);}else {throw new MqException("[Router] 交换机类型有误 exchangeType=" + type);}}

 对于主题交换机,我们进行详细的讲解.

  • 1. 将bindingKey 和 routingKey 进行按照"."进行分割成字符串数组
  • 2. 定义下标进行遍历数组
  • 3. 遍历两个数组,主要分为5种情况.
    • 3.1  当bindingKey遇到*号时直接跳过*,两个下标都进行自增1
    • 3.2 当bindingKey遇到#号,如果此时#号是bindingKey的最后一位,那么直接返回true
    • 3.3 当bindingKey遇到#号,如果此时#号不是最后一位,就去匹配#号下一位在routingKey的部分,匹配到了就将routingIndex指到匹配的位置,进而在进行上述循环,如果没匹配到就返回false
    • 3.4 此时没有遇见通配符,所有的内容部都要进行匹配上,匹配不上就返回false
    • 3.5 最后判断此时两个数组的下标是否都比较到了末尾.比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失败的
/*** 用来实现:topic类型的交换机的转发规则* @param binding  绑定信息对象* @param message  消息对象* @return*/private boolean routerTopic(Binding binding, Message message) {// 1. 将bindingKey 和 routingKey 进行按照"."进行分割String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");// 2. 定义用来遍历数组的下标int bindingIndex = 0;int routingIndex = 0;// 3. 进行遍历两个数组while (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length){if (bindingTokens[bindingIndex].equals("*")){// (1.)遇到*号两个下标直接跳过 * 可以匹配一个部分bindingIndex++;routingIndex++;}else if (bindingTokens[bindingIndex].equals("#")){bindingIndex += 1;// (2.)遇到#号   # 可以匹配多个部分if (bindingIndex == bindingTokens.length){// (3.)当遇到#号,#号的下标为最后一个元素的时候,直接返回true,因为可以直接匹配后面所有的内容return true;}else {// (4.)当遇到#号,后面后还有内容的时候,就去匹配#号下一个部分在routingKey的部分,// 匹配了就直接将bindingIndex指到bindingTokens下一个部分,同时将routingIndex指到匹配的地方// 没匹配配到就返回falseroutingIndex = findNextMatch(routingIndex,routingTokens,bindingTokens[bindingIndex]);if (routingIndex == -1){return false;}bindingIndex++;routingIndex++;}}else {// (5.)此时没有遇见通配符,所有的内容部都要进行匹配上if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])){return false;}bindingIndex++;routingIndex++;}}// (6.)最后判断此时两个数组的下标是否都比较到了末尾// 比如 aaa.bbb.ccc  和  aaa.bbb 是要匹配失败的if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {return true;}return false;}/*** 给定起始下标去在一个数组中寻找指定数组元素,找到就返回该元素在数组的下标,没找到就返回-1;* @param routingIndex   起始下标* @param routingTokens  目标数组* @param bindingToken   目标元素* @return*/private int findNextMatch(int routingIndex, String[] routingTokens, String bindingToken) {for (int i = routingIndex; i < routingTokens.length; i++) {if (routingTokens[i].equals(bindingToken)){return i;}}return -1;}

以上就是整个Router的所有方法.我们对上述代码进行单元测试.

2.4 单元测试

 

 

package com.example.demo.mqserver.core;import com.example.demo.common.MqException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description:测试交换机的转发规则(交换机类型为topic)* User: YAO* Date: 2023-08-01* Time: 13:56*/
@SpringBootTest
class RouterTest {private Router router = new Router();private Binding binding = null;private Message message = null;@BeforeEachpublic void setUp() {binding = new Binding();message = new Message();}@AfterEachpublic void tearDown() {binding = null;message = null;}/***  [测试用例]*      binding key          routing key         result*      aaa                  aaa                 true*      aaa.bbb              aaa.bbb             true*      aaa.bbb              aaa.bbb.ccc         false*      aaa.bbb              aaa.ccc             false*      aaa.bbb.ccc          aaa.bbb.ccc         true*      aaa.*                aaa.bbb             true*      aaa.*.bbb            aaa.bbb.ccc         false*      *.aaa.bbb            aaa.bbb             false*      #                    aaa.bbb.ccc         true*      aaa.#                aaa.bbb             true*      aaa.#                aaa.bbb.ccc         true*      aaa.#.ccc            aaa.ccc             true*      aaa.#.ccc            aaa.bbb.ccc         true*      aaa.#.ccc            aaa.aaa.bbb.ccc     true*      #.ccc                ccc                 true*      #.ccc                aaa.bbb.ccc         true*/@Testpublic void test1() throws MqException {binding.setBindingKey("aaa");message.setRoutingKey("aaa");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test2() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test3() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test4() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test5() throws MqException {binding.setBindingKey("aaa.bbb.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test6() throws MqException {binding.setBindingKey("aaa.*");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test7() throws MqException {binding.setBindingKey("aaa.*.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test8() throws MqException {binding.setBindingKey("*.aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test9() throws MqException {binding.setBindingKey("#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test10() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test11() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test12() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test13() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test14() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test15() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test16() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}
}

 单元测试通过.

3. 订阅消息

        在我们的虚拟主机中进行添加方法完成消息的订阅.要想完成消息的订阅,就需要在消息队列中新建一个列表consumerEnvList用来存储消费者的信息,当有消息进行存储到队列的时候,此时选出消费者进行消费消息.而消费者消费信息的这个环境需要单独定义一个类ConsumerEnv进行描述.以上这个消费信息的过程我们定义一个类ConsumerManager进行管理这些逻辑.

3.1 添加一个订阅者

给队列添加消费者,当队列接收到消息的时候,就要将消息推送给订阅者

public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);e.printStackTrace();return false;}}

此处插入的参数Consumer相当于一个回调函数,就是一个函数式接口.我们在common中进行定义Consumer

@FunctionalInterface
public interface Consumer {// Delivery 的意思是 "投递", 这个方法预期是在每次服务器收到消息之后, 来调用.// 通过这个方法把消息推送给对应的消费者.// (注意! 这里的方法名和参数, 也都是参考 RabbitMQ 展开的)void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws IOException, MqException;
}

定义这个回调函数表示:收到消息之后要对消息进行处理.

3.2 创建订阅者管理类ConsumerManager

1.  这个类是和虚拟主机是一一对应的,每个虚拟主机都有一个管理消费者的对象,而管理的消费者的对象对应的是与之对应的.

2. 我们采用一个堵塞队列来记录收到消息的的队列名字,每次队列收到消息,就会往这个队列中进行添加队列的名字,然后后续进行通知这个队列的消费者进行消费消息.

3. 单独使用一个线程池用来执行消息的回调.(主要是获取到消息之后,给响应设置消息的属性与消息本体发送给客户端.)

4. 我们设置一个扫描线程,从堵塞队列不断地取出元素,进而找到队列,在这个队列进行消费消息,并且设置扫描线程为后台线程,这样就不会阻止进程的结束.

public class ConsumerManager {// 1. 持有虚拟主机对象的引用,用来操作数据private VirtualHost parent;// 2. 指定一个线程池,负责执行具体的回调任务private ExecutorService workPool = Executors.newCachedThreadPool();// 3. 存放令牌的队列,存放接收到消息的队列名字(堵塞队列)// 当这个堵塞队列一接收到队列的名字,扫描线程就会就会找到虚拟主机,然后找到这个队列,进而消费消息private BlockingQueue<String> tokenQueue = new LinkedBlockingDeque<>();// 4. 扫描线程  (关注令牌队列中添加了哪些队列的名字,就知道哪些队列添加了消息,取出消息,进而交给线程池,进行消费这些消息)private Thread scannerThread = null;
}

1. 给堵塞队列设置接口,供虚拟主机进行调用.

/*** 1. 收到消息,通知消费者进行消费消息(将消息对应的队列名字添加到堵塞队列中)*/public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}

2. 实现扫描线程

public ConsumerManager(VirtualHost p) {parent = p;scannerThread = new Thread(() -> {while (true) {try {// 1. 拿到令牌String queueName = tokenQueue.take();// 2. 根据令牌, 找到队列MSQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);}// 3. 从这个队列中消费一个消息.synchronized (queue) {consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});// 把线程设为后台线程.// 后台线程不会影响进程的结束scannerThread.setDaemon(true);scannerThread.start();}

3. 添加消费者环境ConsumerEnv到指定的队列

我们在common中实现这个类

@Data
public class ConsumerEnv {// 1. 消费者的身份标识private String consumerTag;// 2. 消费者消费队列的名字private String queueName;// 3. 是否自动应答private boolean autoAck;// 4. 通过这个回调函数来处理收到的消息.private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag = consumerTag;this.queueName = queueName;this.autoAck = autoAck;this.consumer = consumer;}
}

(1) 按照指定的队列名找到这个类.

(2) 创建消费者环境对象,进行添加,同时如果这个队列的消息存在,就需要进行消费这些信息,调用consumeMessage()方法传入队列的名字.

/*** 2. 新增Consumer对象到指定的对列*/public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列.MSQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = parent.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}}

4. 消费消息 consumeMessage()

(1) 因为一个队列中可能会有多个消费者,我们按照轮询的方式进行挑选消费者进行消费消息,在队列的类中,设置方法chooseConsumer()

/*** 挑选订阅者 进行消费队列中的消息 (轮询的方式)* @return*/public ConsumerEnv chooseConsumer(){// 1. 如果当前队列对应的消费者的数量为0,直接返回null,表示没有筛选到消费者if (consumerEnvList.size() == 0){return null;}// 2. 使用当前订阅到的下标进行对消费者列表取模,然后进行挑选消费者记性消费消息,实现消息的轮询消费int index = consumerSeq.get() % consumerEnvList.size();consumerSeq.getAndIncrement();return consumerEnvList.get(index);}

(2) 从队列中取出消息

(3) 把消息带入到回调方法,交给线程池进行执行

/*** 消费者进行消费信息* @param queue*/private void consumeMessage(MSQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumerEnv luckyDog = queue.chooseConsumer();if (luckyDog == null) {// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.return;}// 2. 从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());System.out.println(message);if (message == null) {// 当前队列中还没有消息, 也不需要消费.return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workPool.submit(() -> {try {// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),message.getBody());// 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.//    如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.if (luckyDog.isAutoAck()) {// 此时是自动应答,表示直接删除// 1) 删除硬盘上的消息if (message.getDeliverMode() == 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// 2) 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageID());// 3) 删除内存中消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageID());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (Exception e) {e.printStackTrace();}});}

3.3 订阅消息小结

 4. 消息确认 basicAck()

 此处是消费者在回调函数中对消息进行处理之后再回调函数中执行的.

  • 1. 获取要删除消息以及所在队列的对象
  • 2. 删除硬盘和内存的数据
  • 3. 删除未确认消息集合的数据
/*** 消费者消费完消息进行手动应答* @return*/public boolean basicAck(String queueName, String messageId){queueName = virtualHostName + queueName;try {// 1. 获取要删除消息以及所在队列的对象Message message = memoryDataCenter.getMessage(messageId);if (message == null){throw new MqException("[VirtualHost] 确认的信息不存在 messageId="+messageId);}MSQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null){throw new MqException("[VirtualHost] 确认的队列不存在 queueName="+queueName);}// 2// 1.)删除硬盘中的数据if(message.getDeliverMode() == 2){diskDataCenter.deleteMessage(queue,message);}// 2.) 删除消息中心的消息memoryDataCenter.removeMessage(message.getMessageID());// 3.) 删除委未确认消息集合的消息memoryDataCenter.removeMessageWaitAck(queue.getName(),message.getMessageID());System.out.println("[VirtualHost] basicAck成功 消息被确认成功  queueName=" + queueName+ ",messageId:." + messageId);return true;} catch (MqException | ClassNotFoundException | IOException e) {e.printStackTrace();System.out.println("[VirtualHost] basicAck失败 消息被确认失败  queueName=" + queueName+ ",messageId:." + messageId);return false;}}

至此以上就是VirtualHost的全部内容,内容很多,很繁琐需要,静下心来仔细的体会.

5. VirtualHost单元测试

 

package com.example.demo.mqserver;import ch.qos.logback.core.util.FileUtil;
import com.example.demo.DemoApplication;
import com.example.demo.common.Consumer;
import com.example.demo.mqserver.core.BasicProperties;
import com.example.demo.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description:虚拟主机的操作测试* User: YAO* Date: 2023-08-01* Time: 18:26*/
class VirtualHostTest {@Autowiredpublic VirtualHost  virtualHost = null;@BeforeEachvoid setUp() {DemoApplication.context = SpringApplication.run(DemoApplication.class);// 创建好虚拟主机对象virtualHost = new VirtualHost("default");}@AfterEachvoid tearDown() throws IOException {DemoApplication.context.close();//把硬盘的目录进行删除File dataDir = new File("./data");FileUtils.deleteDirectory(dataDir);}@Testvoid exchangeDeclare() {boolean ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);}@Testvoid exchangeDelete() {boolean ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);ok = virtualHost.exchangeDelete("testExchange");Assertions.assertTrue(ok);}@Testvoid queueDeclare() {boolean ok = virtualHost.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);}@Testvoid queueDelete() {boolean ok = virtualHost.queueDeclare("testQueue",true,false,false,null);ok = virtualHost.queueDelete("testQueue");Assertions.assertTrue(ok);}@Testvoid queueBind() {boolean ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);ok = virtualHost.queueDeclare("testQueue",true,false,false,null);ok = virtualHost.queueBind("testQueue","testExchange","testBindingKey");Assertions.assertTrue(ok);}@Testvoid queueUnbind() {boolean ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);ok = virtualHost.queueDeclare("testQueue",true,false,false,null);ok = virtualHost.queueBind("testQueue","testExchange","testBindingKey");ok = virtualHost.queueUnbind("testQueue","testExchange");Assertions.assertTrue(ok);}@Testvoid basicPublish() {boolean ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);ok = virtualHost.queueDeclare("testQueue",true,false,false,null);ok = virtualHost.basicPublish("testExchange","testQueue",null,"Hello".getBytes(StandardCharsets.UTF_8));Assertions.assertTrue(ok);}/*** 1. 先订阅, 后发布消息*/@Testpublic void testBasicConsume1() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先订阅队列ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);// 再发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);}/***  先发送消息, 后订阅队列.*/@Testpublic void testBasicConsume2() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);// 再订阅队列ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicConsumeFanout() throws InterruptedException {// 创建一个交换机,并且绑定两个队列boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue1", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue1", "testExchange", "");Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue2", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue2", "testExchange", "");Assertions.assertTrue(ok);// 发布消息发到交换机ok = virtualHost.basicPublish("testExchange", "", null, "hello".getBytes());Assertions.assertTrue(ok);Thread.sleep(500);// 两个消费者订阅上述的两个队列.ok = virtualHost.basicConsume("testConsumer1", "testQueue1", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicConsumeTopic() throws InterruptedException {// 1. 创建交换机(主题交换机)boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC, false, false, null);Assertions.assertTrue(ok);// 2. 创建队列ok = virtualHost.queueDeclare("testQueue", false, false, false, null);Assertions.assertTrue(ok);// 3. 将交换机和队列进行绑定(设置bindingKey)ok = virtualHost.queueBind("testQueue", "testExchange", "aaa.*.bbb");Assertions.assertTrue(ok);// 4. 发布消息(设置routingKey)ok = virtualHost.basicPublish("testExchange", "aaa.ccc.bbb", null, "hello".getBytes());Assertions.assertTrue(ok);// 5. 订阅消息ok = virtualHost.basicConsume("testConsumer", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicAck() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);// 再订阅队列 [要改的地方, 把 autoAck 改成 false]ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);// [要改的地方, 新增手动调用 basicAck]boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId());Assertions.assertTrue(ok);}});Assertions.assertTrue(ok);Thread.sleep(500);}
}

结语

        本文将整个VirtualHost进行了实现,实现了供BrokerServer调用的API.基础的消息队列框架已经搭建好了,接下来就是搭建服务器和客户端了.请持续关注,谢谢!!!

完整的项目代码已上传Gitee,欢迎大家访问.👇👇👇

模拟实现消息队列icon-default.png?t=N6B9https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/28699.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Spring Boot】Spring Boot项目的创建和文件配置

目录 一、为什么要学Spring Boot 1、Spring Boot的优点 二、创建Spring Boot项目 1、创建项目之前的准备工作 2、创建Spring Boot项目 3、项目目录的介绍 4、安装Spring Boot快速添加依赖的插件 5、在项目中写一个helloworld 三、Spring Boot的配置文件 1、配置文件的…

TCP/IP四层模型对比OSI七层网络模型的区别是啥?数据传输过程原来是这样的

一、TCP/IP四层模型对比OSI七层模型 它们两个定义的一些功能和协议都是差不多的。TCP/IP四层协议模型比我们的七层少了三层&#xff0c;把我们的数据链路层和物理层放在一层里面了&#xff0c;叫做数据链路层&#xff08;网络接口层&#xff09;&#xff0c;对应网络协议也没有…

本质安全设备标准(IEC60079-11)的理解(六)温度

本质安全设备标准&#xff08;IEC60079-11&#xff09;的理解&#xff08;六&#xff09;温度 本质安全设备的温度要求也是非常复杂的&#xff0c;首先在标准中涉及有3个温度的概念&#xff1a; 环境温度ambient temperature&#xff0c; 工作温度service temperature和最高表…

【Docker】数据库动态授权组件在Kubernetes集群下的测试过程记录

目录 背景 组件原理 测试设计 环境 测试脚本 脚本build为linux可执行文件 镜像构建 Dockerfile Docker build 镜像有效性验证 总结 资料获取方法 背景 我们都知道出于安全性考虑&#xff0c;生产环境的权限一般都是要做最小化控制&#xff0c;尤其是数据库的操作授…

【MFC】05.MFC第一大机制:程序启动机制-笔记

MFC程序开发所谓是非常简单&#xff0c;但是对于我们逆向人员来说&#xff0c;如果想要逆向MFC程序&#xff0c;那么我们就必须了解它背后的机制&#xff0c;这样我们才能够清晰地逆向出MFC程序&#xff0c;今天这篇文章就来带领大家了解MFC的第一大机制&#xff1a;程序启动机…

STM32 低功耗学习

STM32 电源系统结构介绍 电源系统&#xff1a;VDDA供电区域、VDD供电区域、1.8V供电区域、后备供电区域。 器件的工作电压&#xff08;VDD&#xff09;2.0~3.6V 为了提高转换精度&#xff0c;给模拟外设独立供电。电压调节器为1.8V供电区域供电&#xff0c;且1.8V供电区域是电…

Snapdrop手机电脑互传-无需下载App

软件介绍 Snapdrop&#xff1a;浏览器中的本地文件共享。灵感来自苹果的空投。 软件访问地址&#xff1a; Snapdrop官网地址 软件开源地址&#xff1a; github 软件截图

【自然语言处理】大模型高效微调:PEFT 使用案例

文章目录 一、PEFT介绍二、PEFT 使用2.1 PeftConfig2.2 PeftModel2.3 保存和加载模型 三、PEFT支持任务3.1 Models support matrix3.1.1 Causal Language Modeling3.1.2 Conditional Generation3.1.3 Sequence Classification3.1.4 Token Classification3.1.5 Text-to-Image Ge…

docker 安装hive

记录一下使用docker快速搭建部署hive环境 目录 写在前面 步骤 安装docker 安装docker安装docker-compose配置docker国内镜像源&#xff08;可选&#xff09; 安装git & 配置github部署Hive docker-hive开始部署 使用Hive命令行收尾工作 安装vi、lrzsz关闭相关命令 END…

NodeJS 个性化音乐推荐系统 毕业设-附源码00485

摘要 科技进步的飞速发展引起人们日常生活的巨大变化&#xff0c;电子信息技术的飞速发展使得电子信息技术的各个领域的应用水平得到普及和应用。信息时代的到来已成为不可阻挡的时尚潮流&#xff0c;人类发展的历史正进入一个新时代。在现实运用中&#xff0c;应用软件的工作规…

分布式问题

1. 分布式系统CAP原理 CAP原理&#xff1a;指在一个分布式系统中&#xff0c;Consistency&#xff08;一致性&#xff09;、Availability&#xff08;可用性&#xff09;、Partitontolerance&#xff08;分区容忍性&#xff09;&#xff0c;三者不可得兼。 一致性&#xff08;C…

Java并发 | 常见线程安全容器

文章目录 简介一、Hash表&#x1f6a3;1、ConcurrentHashMap1.1 内部实现原理1.2 并发操作方法1.3 ConcurrentHashMap与Hashtable的比较 二、集合&#x1f6a3;2、CopyOnWriteArrayList2.1 内部实现原理2.2 Copy-On-Write(COW)设计思想2.3 实操 三、Map&#x1f6a3;3、Concurr…

【深度学习_TensorFlow】手写数字识别

写在前面 到这里为止&#xff0c;我们已经学习完张量的常用操作方法&#xff0c;已具备实现大部分神经网络技术的基础储备了。这一章节我们将开启神经网络的学习&#xff0c;然而并不需要像学习前面那样了解大量的张量操作&#xff0c;而是将重点转向理解概念知识&#xff0c;…

2021年03月 Python(一级)真题解析#中国电子学会#全国青少年软件编程等级考试

一、单选题(共25题,每题2分,共50分) 第1题 下列哪个操作不能退出IDLE环境? A:Alt+F4 B:Ctrl+Q C:按ESC键 D:exit() 正确的答案是:B:Ctrl+Q 解析:在IDLE环境中,Ctrl+Q组合键没有特定的功能,不会退出IDLE环境。要退出IDLE环境,可以使用exit()函数或者quit…

设计模式---工厂模式

1.什么是设计模式 软件设计模式&#xff08;Design pattern&#xff09;&#xff0c;又称设计模式&#xff0c;是一套被反复使用、多数人知晓的、经过分类编目的、代码设计经验的总结。使用设计模式是为了可重用代码、让代码更容易被他人理解、保证代码可靠性、程序的重用性。 …

一篇文章教会你什么是Linux进程控制

Linux进程控制 进程创建1.fork函数初识1.1那么fork创建子进程时&#xff0c;操作系统都做了什么呢&#xff1f;1.2 父子进程和CPU中的EIP&#xff08;指令指针&#xff09;之间存在一定的关系1.3 fork的常规用法有哪些&#xff1f;1.4 fork调用失败的原因有哪些&#xff1f; 2.…

【小程序】Canvas 画布分享海报

成品效果图 可以通过切换下面图片形成不同的海报背景分享图 <template><view>// type"2d"必须加<canvas type"2d" :style"{width:Artwidth px,height:Artheight px, margin:0 auto}" canvas-id"firstCanvas"id&quo…

《Java-SE-第三十一章》之网络编程

前言 在你立足处深挖下去,就会有泉水涌出!别管蒙昧者们叫嚷:“下边永远是地狱!” 博客主页&#xff1a;KC老衲爱尼姑的博客主页 博主的github&#xff0c;平常所写代码皆在于此 共勉&#xff1a;talk is cheap, show me the code 作者是爪哇岛的新手&#xff0c;水平很有限&…

Windows环境利用QT+CMake编译mingw版本的opencv

Opencv官网没有提供mingw版本的opencv库&#xff0c;所以需要自己编译&#xff0c;下面是编译过程&#xff0c;32位64位方法类似。 可以直接下载编译好的mingw版本opencv4.4&#xff1a; 使用CMAKE3.22QT5.13编译后的opencv4.4&#xff08;32位的&#xff09;资源-CSDN文库 …

Python实战之使用Python进行数据挖掘详解

一、Python数据挖掘 1.1 数据挖掘是什么&#xff1f; 数据挖掘是从大量的、不完全的、有噪声的、模糊的、随机的实际应用数据中&#xff0c;通过算法&#xff0c;找出其中的规律、知识、信息的过程。Python作为一门广泛应用的编程语言&#xff0c;拥有丰富的数据挖掘库&#…