✔ ★Java项目——设计一个消息队列(五)【虚拟主机设计】

虚拟主机设计

  • 创建 VirtualHost
    • 实现构造⽅法和 getter
    • 创建交换机
    • 删除交换机
    • 创建队列
    • 删除队列
    • 创建绑定
    • 删除绑定
    • 发布消息 ★
    • 路由规则
      • 1) 实现 route ⽅法
      • 2) 实现 checkRoutingKeyValid
      • 3) 实现 checkBindingKeyValid
      • 4) 实现 routeTopic
      • 5) 匹配规则测试⽤例
      • 6) 测试 Router
    • 订阅消息
      • 1) 添加⼀个订阅者
      • 2) 创建订阅者管理管理类
      • 3) 添加令牌接⼝
      • 4) 实现添加订阅者
      • 给 MsgQueue 添加⼀个订阅者列表
      • 5) 实现扫描线程
      • 6) 实现消费消息
    • ⼩结
    • 消息确认
    • 测试 VirtualHost

在这里插入图片描述

在这里插入图片描述

创建 VirtualHost

创建 mqserver.VirtualHost
在这里插入图片描述
其中 Router ⽤来定义转发规则, ConsumerManager ⽤来实现消息消费. 这两个内容后续再介绍

实现构造⽅法和 getter

在这里插入图片描述

创建交换机

• 此处的 autoDelete, arguments 其实并没有使⽤. 只是先预留出来. (RabbitMQ 是⽀持的) .
• 约定, 交换机/队列的名字, 都加上 VirtualHostName 作为前缀. 这样不同 VirtualHost 中就可以存在
同名的交换机或者队列了.
• exchangeDeclare 的语义是, 不存在就创建, 存在则直接返回. 因此不叫做 “exchangeCreate”.
• 先写硬盘, 后写内存. 因为写硬盘失败概率更⼤. 如果硬盘写失败了, 也就不必写内存了.

在这里插入图片描述

删除交换机

在这里插入图片描述

创建队列

在这里插入图片描述

删除队列

在这里插入图片描述

创建绑定

在这里插入图片描述

删除绑定

在这里插入图片描述

发布消息 ★

• 发布消息其实是把消息发送给指定的 Exchange, 再根据 Exchange 和 Queue 的 Binding 关系, 转发到对应队列中.
• 发送消息需要指定 routingKey, 这个值的作⽤和 ExchangeType 是相关的.
◦ Direct: routingKey 就是对应队列的名字. 此时不需要 binding 关系, 也不需要 bindingKey, 就可以直接转发消息.
◦ Fanout: routingKey 不起作⽤, bindingKey 也不起作⽤. 此时消息会转发给绑定到该交换机上的所有队列中.
◦ Topic: routingKey 是⼀个特定的字符串, 会和 bindingKey 进⾏匹配. 如果匹配成功, 则发到对应的队列中. 具体规则后续介绍.
• BasicProperties 是消息的元信息. body 是消息本体.

    // 发送消息到指定的交换机/队列中.public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {// 1. 转换交换机的名字exchangeName = virtualHostName + exchangeName;// 2. 检查 routingKey 是否合法.if (!router.checkRoutingKey(routingKey)) {throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);}// 3. 查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}// 4. 判定交换机的类型if (exchange.getType() == ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.// 此时, 可以无视绑定关系.String queueName = virtualHostName + routingKey;// 5. 构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 6. 查找该队列名对应的对象MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}// 7. 队列存在, 直接给队列中写入消息sendMessage(queue, message);} else {// 按照 fanout 和 topic 的方式来转发.// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {// 1) 获取到绑定对象, 判定对应的队列是否存在Binding binding = entry.getValue();MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if (queue == null) {// 此处咱们就不抛出异常了. 可能此处有多个这样的队列.// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());continue;}// 2) 构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 3) 判定这个消息是否能转发给该队列.//    如果是 fanout, 所有绑定的队列都要转发的.//    如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配.if (!router.route(exchange.getType(), binding, message)) {continue;}// 4) 真正转发消息给队列sendMessage(queue, message);}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 消息发送失败!");e.printStackTrace();return false;}}private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息, 就是把消息写入到 硬盘 和 内存 上.int deliverMode = message.getDeliverMode();// deliverMode 为 1 , 不持久化. deliverMode 为 2 表示持久化.if (deliverMode == 2) {diskDataCenter.sendMessage(queue, message);}// 写入内存memoryDataCenter.sendMessage(queue, message);// 此处还需要补充一个逻辑, 通知消费者可以消费消息了.consumerManager.notifyConsume(queue.getName());}

路由规则

1) 实现 route ⽅法

    public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException {// 根据不同的 exchangeType 使用不同的判定转发规则.if (exchangeType == ExchangeType.FANOUT) {// 如果是 FANOUT 类型, 则该交换机上绑定的所有队列都需要转发return true;} else if (exchangeType == ExchangeType.TOPIC) {// 如果是 TOPIC 主题交换机, 规则就要更复杂一些.return routeTopic(binding, message);} else {// 其他情况是不应该存在的.throw new MqException("[Router] 交换机类型非法! exchangeType=" + exchangeType);}}

2) 实现 checkRoutingKeyValid

    public boolean checkBindingKey(String bindingKey) {if (bindingKey.length() == 0) {// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.return true;}// 检查字符串中不能存在非法字符for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if (ch >= 'A' && ch <= 'Z') {continue;}if (ch >= 'a' && ch <= 'z') {continue;}if (ch >= '0' && ch <= '9') {continue;}if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {continue;}return false;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况;  aaa.a*.bbb 非法情况.String[] words = bindingKey.split("\\.");for (String word : words) {// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {return false;}}// 约定一下, 通配符之间的相邻关系(人为(俺)约定的).// 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大~~// 1. aaa.#.#.bbb    => 非法// 2. aaa.#.*.bbb    => 非法// 3. aaa.*.#.bbb    => 非法// 4. aaa.*.*.bbb    => 合法for (int i = 0; i < words.length - 1; i++) {// 连续两个 ##if (words[i].equals("#") && words[i + 1].equals("#")) {return false;}// # 连着 *if (words[i].equals("#") && words[i + 1].equals("*")) {return false;}// * 连着 #if (words[i].equals("*") && words[i + 1].equals("#")) {return false;}}return true;}

3) 实现 checkBindingKeyValid

    // bindingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分// 3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.public boolean checkBindingKey(String bindingKey) {if (bindingKey.length() == 0) {// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.return true;}// 检查字符串中不能存在非法字符for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if (ch >= 'A' && ch <= 'Z') {continue;}if (ch >= 'a' && ch <= 'z') {continue;}if (ch >= '0' && ch <= '9') {continue;}if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {continue;}return false;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况;  aaa.a*.bbb 非法情况.String[] words = bindingKey.split("\\.");for (String word : words) {// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {return false;}}// 约定一下, 通配符之间的相邻关系(人为(俺)约定的).// 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大~~// 1. aaa.#.#.bbb    => 非法// 2. aaa.#.*.bbb    => 非法// 3. aaa.*.#.bbb    => 非法// 4. aaa.*.*.bbb    => 合法for (int i = 0; i < words.length - 1; i++) {// 连续两个 ##if (words[i].equals("#") && words[i + 1].equals("#")) {return false;}// # 连着 *if (words[i].equals("#") && words[i + 1].equals("*")) {return false;}// * 连着 #if (words[i].equals("*") && words[i + 1].equals("#")) {return false;}}return true;}

4) 实现 routeTopic

    // [测试用例]// binding key          routing key         result// aaa                  aaa                 true// aaa.bbb              aaa.bbb             true// aaa.bbb              aaa.bbb.ccc         false// aaa.bbb              aaa.ccc             false// aaa.bbb.ccc          aaa.bbb.ccc         true// aaa.*                aaa.bbb             true// aaa.*.bbb            aaa.bbb.ccc         false// *.aaa.bbb            aaa.bbb             false// #                    aaa.bbb.ccc         true// aaa.#                aaa.bbb             true// aaa.#                aaa.bbb.ccc         true// aaa.#.ccc            aaa.ccc             true// aaa.#.ccc            aaa.bbb.ccc         true// aaa.#.ccc            aaa.aaa.bbb.ccc     true// #.ccc                ccc                 true// #.ccc                aaa.bbb.ccc         trueprivate boolean routeTopic(Binding binding, Message message) {// 先把这两个 key 进行切分String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");// 引入两个下标, 指向上述两个数组. 初始情况下都为 0int bindingIndex = 0;int routingIndex = 0;// 此处使用 while 更合适, 每次循环, 下标不一定就是 + 1, 不适合使用 forwhile (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {if (bindingTokens[bindingIndex].equals("*")) {// [情况二] 如果遇到 * , 直接进入下一轮. * 可以匹配到任意一个部分!!bindingIndex++;routingIndex++;continue;} else if (bindingTokens[bindingIndex].equals("#")) {// 如果遇到 #, 需要先看看有没有下一个位置.bindingIndex++;if (bindingIndex == bindingTokens.length) {// [情况三] 该 # 后面没东西了, 说明此时一定能匹配成功了!return true;}// [情况四] # 后面还有东西, 拿着这个内容, 去 routingKey 中往后找, 找到对应的位置.// findNextMatch 这个方法用来查找该部分在 routingKey 的位置. 返回该下标. 没找到, 就返回 -1routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);if (routingIndex == -1) {// 没找到匹配的结果. 匹配失败return false;}// 找到的匹配的情况, 继续往后匹配.bindingIndex++;routingIndex++;} else {// [情况一] 如果遇到普通字符串, 要求两边的内容是一样的.if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {return false;}bindingIndex++;routingIndex++;}}// [情况五] 判定是否是双方同时到达末尾// 比如 aaa.bbb.ccc  和  aaa.bbb 是要匹配失败的.if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {return true;}return false;}private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {for (int i = routingIndex; i < routingTokens.length; i++) {if (routingTokens[i].equals(bindingToken)) {return i;}}return -1;}

5) 匹配规则测试⽤例

// [测试用例]
// binding key          routing key         result
// aaa                  aaa                 true
// aaa.bbb              aaa.bbb             true
// aaa.bbb              aaa.bbb.ccc         false
// aaa.bbb              aaa.ccc             false
// aaa.bbb.ccc          aaa.bbb.ccc         true
// aaa.*                aaa.bbb             true
// aaa.*.bbb            aaa.bbb.ccc         false
// *.aaa.bbb            aaa.bbb             false
// #                    aaa.bbb.ccc         true
// aaa.#                aaa.bbb             true
// aaa.#                aaa.bbb.ccc         true
// aaa.#.ccc            aaa.ccc             true
// aaa.#.ccc            aaa.bbb.ccc         true
// aaa.#.ccc            aaa.aaa.bbb.ccc     true
// #.ccc                ccc                 true
// #.ccc                aaa.bbb.ccc         true

6) 测试 Router

package com.example.mq;import com.example.mq.common.MqException;
import com.example.mq.mqserver.core.Binding;
import com.example.mq.mqserver.core.ExchangeType;
import com.example.mq.mqserver.core.Message;
import com.example.mq.mqserver.core.Router;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class RouterTests {private Router router = new Router();private Binding binding = null;private Message message = null;@BeforeEachpublic void setUp() {binding = new Binding();message = new Message();}@AfterEachpublic void tearDown() {binding = null;message = null;}// [测试用例]// binding key          routing key         result// aaa                  aaa                 true// aaa.bbb              aaa.bbb             true// aaa.bbb              aaa.bbb.ccc         false// aaa.bbb              aaa.ccc             false// aaa.bbb.ccc          aaa.bbb.ccc         true// aaa.*                aaa.bbb             true// aaa.*.bbb            aaa.bbb.ccc         false// *.aaa.bbb            aaa.bbb             false// #                    aaa.bbb.ccc         true// aaa.#                aaa.bbb             true// aaa.#                aaa.bbb.ccc         true// aaa.#.ccc            aaa.ccc             true// aaa.#.ccc            aaa.bbb.ccc         true// aaa.#.ccc            aaa.aaa.bbb.ccc     true// #.ccc                ccc                 true// #.ccc                aaa.bbb.ccc         true@Testpublic void test1() throws MqException {binding.setBindingKey("aaa");message.setRoutingKey("aaa");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test2() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test3() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test4() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test5() throws MqException {binding.setBindingKey("aaa.bbb.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test6() throws MqException {binding.setBindingKey("aaa.*");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test7() throws MqException {binding.setBindingKey("aaa.*.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test8() throws MqException {binding.setBindingKey("*.aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test9() throws MqException {binding.setBindingKey("#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test10() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test11() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test12() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test13() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test14() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test15() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test16() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}
}

订阅消息

1) 添加⼀个订阅者

    // 订阅消息.// 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者.// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答.// consumer: 是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作 lambda 样子了.public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);e.printStackTrace();return false;}}

Consumer 相当于⼀个回调函数. 放到 common.Consumer 中

@FunctionalInterface
public interface Consumer {// Delivery 的意思是 "投递", 这个方法预期是在每次服务器收到消息之后, 来调用.// 通过这个方法把消息推送给对应的消费者.// (注意! 这里的方法名和参数, 也都是参考 RabbitMQ 展开的)void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException;
}

2) 创建订阅者管理管理类

在这里插入图片描述
• parent ⽤来记录虚拟主机.
• 使⽤⼀个阻塞队列⽤来触发消息消费. 称为令牌队列. 每次有消息过来了, 都往队列中放⼀个令牌(也就是队列名), 然后消费者再去消费对应队列的消息.
• 使⽤⼀个线程池⽤来执⾏消息回调.

这样令牌队列的设定避免搞出来太多线程. 否则就需要给每个队列都安排⼀个单独的线程了, 如果队列很多则开销就⽐较⼤了

3) 添加令牌接⼝

    // 这个方法的调用时机就是发送消息的时候.public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}

4) 实现添加订阅者

• 新来订阅者的时候, 需要先消费掉之前积压的消息.
• consumeMessage 真正的消息消费操作, ⼀会再实现.

    public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列.MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = parent.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}}

创建 ConsumerEnv , 这个类表⽰⼀个订阅者的执⾏环境

public class ConsumerEnv {private String consumerTag;private String queueName;private boolean autoAck;// 通过这个回调来处理收到的消息.private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag = consumerTag;this.queueName = queueName;this.autoAck = autoAck;this.consumer = consumer;}

给 MsgQueue 添加⼀个订阅者列表

在这里插入图片描述
此处的 chooseConsumer 是实现⼀个轮询效果. 如果⼀个队列有多个订阅者, 将会按照轮询的⽅式轮
流拿到消息

5) 实现扫描线程

在 ConsumerManager 中创建⼀个线程, 不停的尝试扫描令牌队列. 如果拿到了令牌, 就真正触发消费消息操作

    public ConsumerManager(VirtualHost p) {parent = p;scannerThread = new Thread(() -> {while (true) {try {// 1. 拿到令牌String queueName = tokenQueue.take();// 2. 根据令牌, 找到队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);}// 3. 从这个队列中消费一个消息.synchronized (queue) {consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});// 把线程设为后台线程.scannerThread.setDaemon(true);scannerThread.start();}

6) 实现消费消息

    private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumerEnv luckyDog = queue.chooseConsumer();if (luckyDog == null) {// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.return;}// 2. 从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if (message == null) {// 当前队列中还没有消息, 也不需要消费.return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workerPool.submit(() -> {try {// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),message.getBody());// 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.//    如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.if (luckyDog.isAutoAck()) {// 1) 删除硬盘上的消息if (message.getDeliverMode() == 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// 2) 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3) 删除内存中消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (Exception e) {e.printStackTrace();}});}

注意: ⼀个队列可能有 N 个消费者, 此处应该按照轮询的⽅式挑⼀个消费者进⾏消费.

⼩结

⼀. 消费消息的两种典型情况

  1. 订阅者已经存在了, 才发送消息
    这种直接获取队列的订阅者, 从中按照轮询的⽅式挑⼀个消费者来调⽤回调即可.
  2. 消息先发送到队列了, 订阅者还没到.
    此时当订阅者到达, 就快速把指定队列中的消息全都消费掉.
    ⼆. 关于消息不丢失的论证
    每个消息在从内存队列中出队列时, 都会先进⼊ 待确认 中.
    • 如果 autoAck 为 true
    消息被消费完毕后(执⾏完消息回调之后), 再执⾏清除⼯作.
    分别清除硬盘数据, 待确认队列, 消息中⼼.
    • 如果 autoAck 为 false
    在回调内部, 进⾏清除⼯作.
    分别清除硬盘数据, 待确认队列, 消息中⼼.
  3. 执⾏消息回调的时候抛出异常
    此时消息仍然处在待确认队列中.
    此时可以⽤⼀个线程扫描待确认队列, 如果发现队列中的消息超时未确认, 则放⼊死信队列.
    死信队列咱们此处暂不实现.
  4. 执⾏消息回调的时候服务器宕机
    内存所有数据都没了, 但是消息在硬盘上仍然存在. 会在服务下次启动的时候, 加载回内存. 重新被消费到.

消息确认

在这里插入图片描述

测试 VirtualHost

package com.example.mq;import com.example.mq.common.Consumer;
import com.example.mq.mqserver.VirtualHost;
import com.example.mq.mqserver.core.BasicProperties;
import com.example.mq.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;import java.io.File;
import java.io.IOException;@SpringBootTest
public class VirtualHostTests {private VirtualHost virtualHost = null;@BeforeEachpublic void setUp() {MqApplication.context = SpringApplication.run(MqApplication.class);virtualHost = new VirtualHost("default");}@AfterEachpublic void tearDown() throws IOException {MqApplication.context.close();virtualHost = null;// 把硬盘的目录删除掉File dataDir = new File("./data");FileUtils.deleteDirectory(dataDir);}@Testpublic void testExchangeDeclare() {boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);}@Testpublic void testExchangeDelete() {boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDelete("testExchange");Assertions.assertTrue(ok);}@Testpublic void testQueueDeclare() {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);}@Testpublic void testQueueDelete() {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueDelete("testQueue");Assertions.assertTrue(ok);}@Testpublic void testQueueBind() {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue", "testExchange", "testBindingKey");Assertions.assertTrue(ok);}@Testpublic void testQueueUnbind() {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue", "testExchange", "testBindingKey");Assertions.assertTrue(ok);ok = virtualHost.queueUnbind("testQueue", "testExchange");Assertions.assertTrue(ok);}@Testpublic void testBasicPublish() {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);}// 先订阅队列, 后发送消息@Testpublic void testBasicConsume1() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先订阅队列ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {try {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);} catch (Error e) {// 断言如果失败, 抛出的是 Error, 而不是 Exception!e.printStackTrace();System.out.println("error");}}});Assertions.assertTrue(ok);Thread.sleep(500);// 再发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);}// 先发送消息, 后订阅队列.@Testpublic void testBasicConsume2() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);// 再订阅队列ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicConsumeFanout() throws InterruptedException {boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue1", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue1", "testExchange", "");Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue2", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue2", "testExchange", "");Assertions.assertTrue(ok);// 往交换机中发布一个消息ok = virtualHost.basicPublish("testExchange", "", null, "hello".getBytes());Assertions.assertTrue(ok);Thread.sleep(500);// 两个消费者订阅上述的两个队列.ok = virtualHost.basicConsume("testConsumer1", "testQueue1", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicConsumeTopic() throws InterruptedException {boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue", "testExchange", "aaa.*.bbb");Assertions.assertTrue(ok);ok = virtualHost.basicPublish("testExchange", "aaa.ccc.bbb", null, "hello".getBytes());Assertions.assertTrue(ok);ok = virtualHost.basicConsume("testConsumer", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicAck() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);// 再订阅队列 [要改的地方, 把 autoAck 改成 false]ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);// [要改的地方, 新增手动调用 basicAck]boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId());Assertions.assertTrue(ok);}});Assertions.assertTrue(ok);Thread.sleep(500);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/6423.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分布式websocket IM即时通讯聊天开源项目如何启动

前言 自己之前分享了分布式websocket的视频有同学去fork项目了&#xff0c;自己启动一下更方便理解项目嘛。然后把项目启动需要的东西全部梳理出来。支持群聊单聊,表情包以及发送图片。 支持消息可靠&#xff0c;消息防重&#xff0c;消息有序。同时基础架构有分布式权限&…

深入教程:在STM32上实现能源管理系统

引言 能源管理系统&#xff08;EMS&#xff09;在提高能源效率、减少能源消耗和支持可持续发展方面起着关键作用。本教程将介绍如何在STM32微控制器上开发一个能源管理系统&#xff0c;这种系统能够监控和控制能源使用&#xff0c;适用于家庭自动化、工业控制系统以及任何需要…

jQuery Moblie 笔记14 开发跨平台移动设备网页

相关内容&#xff1a;jQuery Moblie基础、操作、移动设备仿真器、jQuery Moblie网页实例、jQuery Moblie的UI组件、…… jQuery推出了一套新的函数库jQuery Mobile&#xff0c;目的是希望能够统一当前移动设备的用户界面(UI)。 移动设备开发应用程序目前大致分为两种&#xff…

MLP手写数字识别(3)-使用tf.data.Dataset模块制作模型输入(tensorflow)

1、tensorflow版本查看 import tensorflow as tfprint(Tensorflow Version:{}.format(tf.__version__)) print(tf.config.list_physical_devices())2、MNIST数据集下载与预处理 (train_images,train_labels),(test_images,test_labels) tf.keras.datasets.mnist.load_data()…

强大而简洁:初学Python必须掌握的14个单行代码

Python的魅力与单行代码的重要性 Python&#xff0c;作为一种高级编程语言&#xff0c;自诞生以来就以其简洁、易读、易学的特性而广受开发者喜爱。其魅力不仅在于其强大的功能和广泛的应用领域&#xff0c;更在于其能够用简洁的代码实现复杂的功能&#xff0c;这种能力在很大…

Redisson 分布式锁和同步器

系列文章目录 文章目录 系列文章目录前言前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。 redisson 是基于redis的扩展库,使得redis除了应用于缓存以外,还能做队列…

FloodFill-----洪水灌溉算法(DFS例题详解)

目录 一.图像渲染&#xff1a; 代码详解&#xff1a; 二.岛屿数量&#xff1a; 代码详解&#xff1a; 三.岛屿的最大面积&#xff1a; 代码详解&#xff1a; 四.被围绕的区域&#xff1a; 代码详解&#xff1a; 五.太平洋大西洋水流问题&#xff1a; 代码详解&#x…

嵌入式单片机中必会的50个电路分享

单片机 电源 声音模块 收音机 485

操作系统-进程管理

1.进程的定义 2.进程的组成 3.进程的特点 4.进程控制结构 5.进程状态 6.进程挂起 6.线程优缺点 7.线程进程的比较 8 .为什么要使用线程 9.用户线程 9.内核线程 10.进程上下文切换信息储存在PCB中 11.fork()

JavaEE初阶Day 15:文件IO(1)

目录 Day 15&#xff1a;文件IO&#xff08;1&#xff09;IO文件1. 路径2. 文件的分类3. 使用Java针对文件系统进行操作3.1 属性3.2 构造方法3.3 方法 Day 15&#xff1a;文件IO&#xff08;1&#xff09; IO I&#xff1a;Input输入 O&#xff1a;Output输出 输入输出规则…

【python的魅力】:教你如何用几行代码实现文本语音识别

文章目录 引言一、运行效果二、文本转换为语音2.1 使用pyttsx32.2 使用SAPI实现文本转换语音2.3 使用 SpeechLib实现文本转换语音 三、语音转换为文本3.1 使用 PocketSphinx实现语音转换文本 引言 语音识别技术&#xff0c;也被称为自动语音识别&#xff0c;目标是以电脑自动将…

Tomcat启动闪退怎么解决(文末附终极解决方案)

AI是这么告诉我的 Tomcat启动时出现闪退问题可能由多种原因引起&#xff0c;以下是解决此类问题的一些通用方法&#xff1a; 检查环境变量&#xff1a; 确保已经正确设置了JAVA_HOME和JRE_HOME环境变量&#xff0c;并指向正确的Java安装路径。将Java的bin目录添加到系统的PATH…

c语言题目

一些关于c语言的题目 文章目录 一、计算程序输出二、以下程序运行时&#xff0c;若输入1abcedf2df<回车>输出结果是将flag的第二个bit置0结构体大小下列C程序执行后c输出结果为&#xff08;&#xff09;设有定义char *p[]{"Shanghai","Beijing",&quo…

scikit-learn:Python中的机器学习-1

简介&#xff1a;问题设置 什么是机器学习&#xff1f; 机器学习是关于构建具有可调参数的程序&#xff0c;这些参数可以自动调整&#xff0c;以便通过适应先前看到的数据来改善其行为。机器学习可以被认为是人工智能的一个子领域&#xff0c;因为这些算法可以被视为构建模块…

Python量化炒股的获取数据函数—get_index_stocks()

Python量化炒股的获取数据函数—get_index_stocks() 利用get_industry_stocks()函数可以获取在给定日期一个行业的所有股票代码列表&#xff0c;其语法格式如下&#xff1a; get_industry_stocks(industry_code, dateNone)各项参数的意义 参数date和返回值&#xff0c;都与g…

你知道什么是Charles吗?

什么是Charles? Charles中文名叫青花瓷&#xff0c;它是一款基于HTTP协议的代理服务器&#xff0c;通过成为电脑或者浏览器的代理&#xff0c;然后截取请求和请求结果达到分析抓包的目的。它跨平台、半免费&#xff0c;与免费版本不同的是&#xff0c;半免费版本的Charles重启…

八、Linux进程检测与控制

章节目标 了解进程和程序的关系了解进程的特点能够使用top动态查看进程信息能够使用ps静态查看进程信息能够使用kill命令给进程发送信号能够调整进程的优先级&#xff08;扩展&#xff09; 引言 在运维的日常工作中&#xff0c;监视系统的运行状况是每天例行的工作&#xff…

PPT基础

5种ppt仅可读形式 Ⅰ 开始选项卡 1.【幻灯片】组中&#xff1a;新建幻灯片&#xff0c;从大纲中导入幻灯片&#xff1b;修改幻灯片的版式&#xff1b;节&#xff08;新增节&#xff0c;重命名节&#xff09;。 2.【字体】组中&#xff1a;设置字体&#xff0c;字体大小&…

docker-compose启动mysql5.7报错

描述一下问题经过&#xff1a; 使用docker compose 部署mysql5.7 文件如下: services:mysql:restart: alwaysimage: mysql:5.7container_name: mysql-devports:- 3306:3306environment:- MYSQL_DATABASEdev- MYSQL_ROOT_PASSWORD123456healthcheck:test: ["CMD", &q…

Python+PYGObject/PYGtk+CSS样式--2024python示例

隔久点不用老是会忘&#xff0c;留个笔记。。 PythonPYGObject/PYGtk&#xff0c;加载 CSS 样式的演示代码 demo 运行的效果截图&#xff1a; #!/usr/bin/env python3 import sys import gigi.require_version("Gtk", "3.0") from gi.repository import …