从零手搓一个【消息队列】实现虚拟主机的核心功能

文章目录

  • 一、虚拟主机设计
  • 二、实现虚拟主机
    • 1, 创建 VirtualHost 类
    • 2, VirtualHost() 构造方法
    • 3, exchangeDeclare() 创建交换机
    • 4, exchageDelete() 删除交换机
    • 5, queueDeclare() 创建队列
    • 6, queueDelete() 删除队列
    • 7, queueBind() 创建绑定
    • 8, queueUnBind() 删除绑定
    • 9, basicPublish() 发布消息
    • 10, basicSubscribe() 订阅消息
    • 11, basicAck() 确认应答
  • 三、实现交换机的转发/绑定规则
    • 1, 设计 bindingKey 和 routingKey
    • 2, checkBindingKey() 检查 bindingKey是否合法
    • 3, checkRoutingKey() 检查 routingKey 是否合法
    • 4, route() 判定转发规则
    • 5, routeFanout() 规定扇出交换机转发规则
    • 6, routeTopic() 规定主题交换机转发规则
  • 四、小结


创建 Spring Boot 项目, Spring Boot 2 系列版本, Java 8 , 引入 MyBatis, Lombok 依赖

提示:是正在努力进步的小菜鸟一只,如有大佬发现文章欠佳之处欢迎批评指点~ 废话不多说,直接上干货!

整体目录结构 :
在这里插入图片描述

本文主要实现 server 包中的 VirtualHost 类


一、虚拟主机设计

到本篇为止, 内存和硬盘的数据都已经组织完成. 接下来使⽤ “虚拟主机” 这个概念, 把这两部分的数据也串起来.并且实现⼀些 MQ 的核心 API

回顾 BrokerServer (中间人服务器) 中的核心概念 :

在这里插入图片描述

声明
在 RabbitMQ 中, 虚拟主机是可以随意创建/删除的. 此处为了实现简单, 并没有实现虚拟主机的管理. 因此我们默认就只有⼀个虚拟主机的存在. 但是在数据结构的设计上我们预留了对于多虚拟主机的管理

虚拟主机存在的目的, 就是为了隔离, 隔离不同业务线上的数据, 所有此处要考虑的是: 交换机和队列从属于虚拟主机中, 如何把不同虚拟主机中的交换机和队列区分开?
此处使用一个简单粗暴的方式, 我们设定交换机和队列的唯一身份标识加一个前缀, 这个前缀就是虚拟主机的唯一标识, 当交换机和队列区分开之后, 绑定和队列中的消息自然就区分开了

虚拟主机中需要的属性和方法 :
在这里插入图片描述

consumerManager 这个对象在下文介绍


二、实现虚拟主机

1, 创建 VirtualHost 类

  • virtualHostName 是虚拟主机的唯一身份标识
  • 虚拟主机要整合硬盘上的数据(数据库, 文件)和内存上的数据, 统一管理和使用, 所以引入 memoryDataCenterdiskDataCenter 这两个成员属性
  • router 用来定义交换机和队列之间的匹配规则和转发规则
  • consumerManager 是虚拟主机实现和消费者相关的 API 时的辅助对象, 这个对象中包含和消费者相关的 API, 仅供内部调用, 不对外暴露
  • exchangeLockqueueLock 是后续实现 API 时保证线程安全的锁对象
public class VirtualHost {private String virtualHostName;private MemoryDataCenter memoryDataCenter;private DiskDataCenter diskDataCenter;private Router router;private ConsumerManager consumerManager; private final Object exchangeLock = new Object();private final Object queueLock = new Object();
}

所以在前几篇文章中介绍的, 文件管理和内存管理, 都是在某个虚拟主机中的, 如果有多个虚拟主机, 每个虚拟主机中都有对应的文件和内存
这篇文章介绍的文件管理
在这里插入图片描述
这篇文章介绍的内存管理
在这里插入图片描述


2, VirtualHost() 构造方法

在构造方法中实现对刚才定义的成员属性的初始化, 对数据库文件的初始化

并且有可能本次不是第一次启动, 而是重启, 就需要恢复硬盘上的数据到内存中

	public VirtualHost(String virtualHostName) {this.virtualHostName = virtualHostName;this.memoryDataCenter = new MemoryDataCenter();this.diskDataCenter = new DiskDataCenter();this.router = new Router();this.consumerManager = new ConsumerManager(this);diskDataCenter.init();try {memoryDataCenter.recover(diskDataCenter);} catch (MQException | IOException e) {System.out.println("[VirtualHost] 内存恢复数据失败");e.printStackTrace();}}

另外提供一些 getter()

	public MemoryDataCenter getMemoryDataCenter() {return memoryDataCenter;}public DiskDataCenter getDiskDataCenter () {return diskDataCenter;}public String getVirtualHostName() {return virtualHostName;}

3, exchangeDeclare() 创建交换机

此处包括下文的核心 API 中的参数都是参考了 RabbitMQ, 并在 这篇文章 中介绍了创建 Exchange, Queue, Binding, Message等核心类时, 已经说明了部分属性本项目中暂不实现

先判断交换机是否已经存在, 如果不存在则创建, 如果存在也不会抛异常, 直接返回即可
根据参数中的 durable 判断是否要写入硬盘

    /*** 创建交换机* @param exchangeName 名称(唯一标识)* @param exchangeTypeEnum 类型* @param durable 是否持久化存储* @param autoDelete 是否自动删除* @param arguments 配置参数* @return  已存在返回 true, 不存在则创建*/public boolean exchangeDeclare(String exchangeName,ExchangeTypeEnum exchangeTypeEnum,boolean durable,boolean autoDelete,Map<String, Object> arguments) {exchangeName = virtualHostName + "-" + exchangeName;try {synchronized (exchangeLock) {// 1, 查询是否已经存在交换机Exchange exchangeExists = memoryDataCenter.getExchange(exchangeName);if (exchangeExists != null) {System.out.println("[VirtualHost.exchangeDeclare()] exchangeName =  " +exchangeName + "的交换机已存在, 创建失败");return true;}// 2, 创建交换机Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeTypeEnum);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);// 3, 写入硬盘if (durable) {diskDataCenter.insertExchange(exchange);}// 4, 写入内存memoryDataCenter.addExchange(exchange);System.out.println("[VirtualHost.exchangeDeclare()] exchangeName =  " + exchangeName + "的交换机创建成功");return true;}} catch (Exception e) {System.out.println("[VirtualHost.exchangeDeclare()] exchangeName =  " +exchangeName + "的交换机创建失败");e.printStackTrace();return false;}}

按照先写入硬盘, 再写入内存的顺序编写代码, 因为写硬盘失败概率更⼤, 如果硬盘写失败了, 也就不必写内存了


4, exchageDelete() 删除交换机

先使用交换机唯一标识查找交换机, 该交换机如果不存在, 就无法删除

如果该交换机是持久化存储的, 则先删除硬盘, 再删除内存

	/*** 删除交换机* @param exchangeName 唯一标识* @return true/false*/public boolean exchangeDelete(String exchangeName) throws MQException {exchangeName = virtualHostName + "-" + exchangeName;try {synchronized (exchangeLock) {// 1, 查找该交换机Exchange exchangeExists = memoryDataCenter.getExchange(exchangeName);if (exchangeExists == null) {throw new MQException("VirtualHost.exchangeDelete() exchangeName = " +exchangeName + "的交换机不存在, 删除失败");}// 2, 硬盘删除if (exchangeExists.isDurable()) {diskDataCenter.deleteExchange(exchangeName);}// 3, 内存删除memoryDataCenter.removeExchange(exchangeName);System.out.println("VirtualHost.exchangeDelete() exchangeName =  "+ exchangeName + "的交换机删除成功");return true;}} catch (MQException e) {System.out.println("VirtualHost.exchangeDelete() exchangeName = " + exchangeName + "的交换机删除失败");e.printStackTrace();return false;}}

5, queueDeclare() 创建队列

    /*** 创建队列* @param queueName 唯一标识* @param durable 是否持久化* @param exclusive 是否被独占* @param autoDelete 是否自动删除* @param arguments 额外参数* @return true/false*/public boolean queueDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete,Map<String, Object> arguments) {queueName = virtualHostName + "-" + queueName;try {synchronized (queueLock) {// 1, 查找是否存在MessageQueue queueExists = memoryDataCenter.getQueue(queueName);if (queueExists != null) {System.out.println("[VirtualHost.queueDeclare()] queueName =  " + queueName + "的队列已存在, 创建失败");return true;}// 2, 创建队列MessageQueue queue = new MessageQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);// 3, 硬盘存储if (durable) {diskDataCenter.insertQueue(queue);}// 4, 内存存储memoryDataCenter.addQueue(queue);System.out.println("[VirtualHost.queueDeclare()] queueName =  " + queueName + "的队列创建成功");return true;}} catch (Exception e) {System.out.println("[VirtualHost.queueDeclare()] queueName =  " + queueName + "的队列创建失败");e.printStackTrace();return false;}}

6, queueDelete() 删除队列

    /*** 删除队列* @param queueName 唯一标识* @return true/false*/public boolean queueDelete(String queueName) {queueName = virtualHostName + "-" + queueName;try {synchronized (queueLock) {MessageQueue queueExists = memoryDataCenter.getQueue(queueName);// 1, 检查是否存在if (queueExists == null) {throw new MQException("VirtualHost.queueDelete() queueName = " + queueName + "的队列已经存在, 删除失败");}// 2, 硬盘删除if (queueExists.isDurable()) {diskDataCenter.deleteQueue(queueName);}// 3, 内存删除memoryDataCenter.removeQueue(queueName);System.out.println("VirtualHost.queueDelete() queueName = " + queueName + "的队列删除成功");return true;}} catch (Exception e) {System.out.println("VirtualHost.queueDelete() queueName = " + queueName + "的队列删除失败");e.printStackTrace();return false;}}

7, queueBind() 创建绑定

  • 先检查交换机和队列的绑定是否存在, 如果存在, 则不创建绑定, 也不抛异常, 直接返回即可
  • 检查 bindingKey 是否合法(后续介绍判定规则)
  • 检查指定的交换机和队列是否存在, 如果不存在, 自然不能创建绑定, 应该抛异常
	public boolean queueBind(String exchangeName, String queueName, String bindingKey) {exchangeName = virtualHostName + "-" + exchangeName;queueName = virtualHostName + "-" + queueName;try {synchronized (exchangeLock) {synchronized (queueLock) {// 1, 检查绑定是否存在Binding bindingExists = memoryDataCenter.getBinding(exchangeName, queueName);if (bindingExists != null) {System.out.println("[VirtualHost.queueBind()] exchangeName = " + exchangeName+ ", queueName = " + queueName + "的绑定已经存在, 无需重复创建");return true; // 这里是否应该允许true??这里是否应该允许true??这里是否应该允许true??这里是否应该允许true??这里是否应该允许true??这里是否应该允许true??}// 2, 检查routingKey是否合法if (!router.checkBindingKey(bindingKey)) {throw new MQException("[VirtualHost.queueBind()] bindingKey = " + bindingKey + "不合法, 绑定创建失败");}// 3, 创建binding对象Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);// 4, 检查交换机/队列是否存在Exchange exchangeExists = memoryDataCenter.getExchange(exchangeName);MessageQueue queueExists = memoryDataCenter.getQueue(queueName);if(exchangeExists == null) {throw new MQException("[VirtualHost.queueBind()] exchangeName = " + exchangeName + "的队列不存在, 绑定创建失败");}if(queueExists == null){throw new MQException("[VirtualHost.queueBind()] queueName = " + queueName + "的队列不存在, 绑定创建失败");}// 5, 写入硬盘if (exchangeExists.isDurable() && queueExists.isDurable()) {diskDataCenter.insertBinding(binding);}// 6, 写入内存memoryDataCenter.addBinding(binding);System.out.println("[VirtualHost.queueBind()] exchangeName = " + exchangeName+ ", queueName = " + queueName + "的绑定创建成功");return true;}}} catch (Exception e) {System.out.println("[VirtualHost.queueBind()] exchangeName = " + exchangeName+ ", queueName = " + queueName + "的绑定创建失败");e.printStackTrace();return false;}}

8, queueUnBind() 删除绑定

先检查绑定是否存在, 如果不存在, 自然不能删除, 应该抛异常

    public boolean queueUnBind(String exchangeName, String queueName) {exchangeName = virtualHostName + "-" + exchangeName;queueName = virtualHostName + "-" + queueName;try {synchronized (exchangeLock) {synchronized (queueLock) {// 1, 检查 binding 是否存在Binding bindingExists = memoryDataCenter.getBinding(exchangeName, queueName);if (bindingExists == null) {throw new MQException("[VirtualHost.queueUnBind()] exchangeName = " + exchangeName+ ", queueName = " + queueName + "的绑定不存在, 删除失败");}// 2, 硬盘删除diskDataCenter.deleteBinding(bindingExists);// 3, 内存删除memoryDataCenter.removeBinding(bindingExists);System.out.println("[VirtualHost.queueUnBind()] exchangeName = " + exchangeName+ ", queueName = " + queueName + "的绑定删除成功");return true;}}} catch (Exception e) {System.out.println("[VirtualHost.queueUnBind()] exchangeName = " + exchangeName+ ", queueName = " + queueName + "的绑定删除失败");e.printStackTrace();return false;}}

9, basicPublish() 发布消息

  • 在创建绑定的时候判定了 bindingKey 是否合法, 在此处发布消息的时候要判定 routingKey 是否合法(这个方法下面介绍)
  • 检查交换机是否存在
  • 如果是直接交换机, 直接把 routingKey 作为队列名, 找到该队列
  • 如果是扇出/主题交换机, 要先判定 bindingKey 和 routingKey 是否匹配, 如果匹配才能转发(这个方法下面介绍)
	public boolean basicPublish(String exchangeName,String routingKey,BasicProperties basicProperties,byte[] body) {exchangeName = virtualHostName + "-" + exchangeName;try {// 1, 检查routingKey是否合法if (!router.checkRoutingKey(routingKey)) {throw new MQException("[VirtualHost.basicPublish()] routingKey = " + routingKey + "不合法, 消息发布失败");}// 2, 查找交换机是否存在Exchange exchangeExists = memoryDataCenter.getExchange(exchangeName);if (exchangeExists == null) {throw new MQException("[VirtualHost.basicPublish()] exchangeName = " + exchangeName + "的交换机不存在, 消息发布失败");}// 3, 构造消息对象Message message = Message.createMessage(routingKey, basicProperties, body);// 4, 判定交换机类型if (exchangeExists.getType() == ExchangeTypeEnum.DIRECT) {// 如果是直接交换机, routingKey就作为队列名String queueName = virtualHostName + "-" + routingKey;// 判定队列是否存在MessageQueue queueExists = memoryDataCenter.getQueue(queueName);if (queueExists == null) {throw new MQException("[VirtualHost.basicPublish()] queueName = " + queueName + "的队列不存在, 消息发布失败");}sendMessage(queueExists, message);} else {ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {Binding binding = entry.getValue();// 判定队列是否存在, 以及 bindingKey 和 routingKey 是否匹配MessageQueue queueExists = memoryDataCenter.getQueue(binding.getQueueName());if (queueExists == null || !router.route(exchangeExists.getType(), binding, message)) {continue;}sendMessage(queueExists, message);}}return true;} catch (MQException e) {System.out.println("[VirtualHost.basicPublish()] 消息发布失败");e.printStackTrace();return false;}}
  • 首先队列为持久化存储, 并且该消息是持久化存储, 才能写入硬盘
  • 生产者发布了消息, 此时应该提醒消费者消费消息(这个方法后续实现)
    private void sendMessage(MessageQueue queue, Message message) {try {// 1, 写入硬盘if ( queue.isDurable() && message.getDeliverMode() == 2 ) {diskDataCenter.sendMessage(queue, message);}// 2, 写入内存memoryDataCenter.sendMessage(queue, message);// 3, 给消费者推送消息(message-->queue, queueName-->tokenQueue)consumerManager.notifyConsumer(queue.getName());} catch (InterruptedException | IOException | MQException e) {System.out.println("'[VirtualHost.sendMessage()] 消息发送失败");e.printStackTrace();}}

10, basicSubscribe() 订阅消息

这个方法涉及到 ConsumerManager 这个类的实现, 下篇文章再介绍 ConsumerManager

这个方法只需要做一步: 添加一个消费者, 后续的逻辑(服务器收到消息后就转发给消费者)交给 ConsumerManager 处理

    /*** 添加一个指定队列的消费者, 来订阅消息* 队列中有消息了就推送给订阅了该队列的消费者(订阅者)* @param consumerTag 消费者唯一身份标识* @param queueName   队列唯一身份标识* @param autoAck     是否自动应答* @param consumable  实现把消息推送给订阅者的接口*/public boolean basicSubscribe(String consumerTag,String queueName,boolean autoAck,Consumable consumable) {queueName = virtualHostName + "-" + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumable);System.out.println("[VirtualHost.basicSubscribe()] consumerTag = " + consumerTag + "添加消费者成功");return true;} catch (MQException e) {System.out.println("[VirtualHost.basicSubscribe()] consumerTag = " + consumerTag + "添加消费者失败");e.printStackTrace();return false;}}

11, basicAck() 确认应答

这个方法用于, 服务器给消费者推送消息之后, 如果消费者选择手动应答, 就应该主动的调用服务器的这个方法

  • 先查询队列和消息是否都存在
  • 如果都存在, 分别在硬盘和内存中, 删除该消息的即可
	public boolean basicAck(String queueName, String messageId) {queueName = virtualHostName + "-" + queueName;try {// 1, 查找队列和消息MessageQueue queueExists = memoryDataCenter.getQueue(queueName);if (queueExists == null) {throw new MQException("[VirtualHost.basicAck()] queueName = " + queueName + "的交换机不存在, 确认应答失败");}Message messageExists = memoryDataCenter.getMessage(messageId);if (messageExists == null) {throw new MQException("[VirtualHost.basicAck()] messageId = " + messageId + "的消息不存在, 确认应答失败");}// 2, 硬盘删除if (queueExists.isDurable() && messageExists.getDeliverMode() == 2) {diskDataCenter.deleteMessage(queueExists, messageExists);}// 3, 内存删除memoryDataCenter.removeMessage(messageId);memoryDataCenter.removeMessageNotAck(queueName, messageId);System.out.println("[VirtualHost.basicAck()] queueName = " + queueName +", messageId = " + messageId + "的确认应答成功");return true;} catch (MQException | IOException e) {System.out.println("[VirtualHost.basicAck()] queueName = " + queueName +", messageId = " + messageId + "的确认应答失败");e.printStackTrace();return false;}}

三、实现交换机的转发/绑定规则

在这里插入图片描述

在 server.core.Router 类中编写代码, 这篇文章 介绍了核心类的实现, 当时没有实现这一块的内容
在这里插入图片描述


1, 设计 bindingKey 和 routingKey

⼀个 routingKey 是由数字, 字⺟, 下划线构成的, 并且可以使⽤ . 分成若⼲部分.
形如 aaa.bbb.ccc
⼀个 bindingKey 是由数字, 字⺟, 下划线构成的, 并且使⽤ . 分成若⼲部分.
另外, ⽀持 *# 两种通配符. (*, # 只能作为 . 切分出来的独⽴部分, 不能和其他数字字⺟混⽤, ⽐如 a.*.b 是合法的, a.*a.b 是不合法的).

  • 其中 * 可以匹配任意⼀个单词.
  • 其中 # 可以匹配任意零个或者多个单词.

bindingKey 为 a.*.b, 可以匹配 routingKey 为 a.a.b 和 a.b.b 和 a.aaa.b
bindingKey 为 a.#.b, 可以匹配 routingKey 为 a.a.b 和 a.b.b 和 a.aaa.b 和 a.aa.bb.b 和 a.b


2, checkBindingKey() 检查 bindingKey是否合法

bindngKey可以为""(空串), 如果交换机类型为直接 / 扇出, bindingKey 用不上, 参数传""即可

  1. 允许是空字符串
  2. 数字字⺟下划线构成
  3. 可以包含通配符
  4. # 不能连续出现
  5. #*不能相邻
	public boolean checkBindingKey(String bindingKey) {if (bindingKey.length() == 0) {return true;}for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if ((ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z') || (ch >= '0' && ch <= '9')|| (ch == '_' || ch == '.' || ch == '*' || ch == '#')) {continue;}return false;}String[] bindingKeyFragments = bindingKey.split("\\.");for (String fragment : bindingKeyFragments) {if (fragment.length() > 1 && (fragment.contains("*") || fragment.contains("#"))) {return false;}}for (int i = 0; i < bindingKeyFragments.length - 1; i++) {// 连续两个 ##if (bindingKeyFragments[i].equals("#") && bindingKeyFragments[i + 1].equals("#")) {return false;}// # 连着 *if (bindingKeyFragments[i].equals("#") && bindingKeyFragments[i + 1].equals("*")) {return false;}// * 连着 #if (bindingKeyFragments[i].equals("*") && bindingKeyFragments[i + 1].equals("#")) {return false;}}return true;}

3, checkRoutingKey() 检查 routingKey 是否合法

如果是扇出交换机, routingKey 用不上, 设置为""即可

    public boolean checkRoutingKey(String routingKey) {if (routingKey.length() == 0) {return true;}for (int i = 0; i < routingKey.length(); i++) {char ch = routingKey.charAt(i);if ((ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z') || (ch >= '0' && ch <= '9')|| (ch == '_' || ch == '.' )) {continue;}return false;}return true;}

4, route() 判定转发规则

直接交换机没有转发这一说, 在上述 basicPublish() 这个方法里已经单独处理过了

	public boolean route(ExchangeTypeEnum exchangeTypeEnum, Binding binding, Message message) throws MQException {if (exchangeTypeEnum == ExchangeTypeEnum.FANOUT) {return routeFanout(binding, message);} else if (exchangeTypeEnum == ExchangeTypeEnum.TOPIC) {return routeTopic(binding, message);}else {throw new MQException("[Router.route()] 非法的交换机类型");}}

5, routeFanout() 规定扇出交换机转发规则

没有转发规则, 只要是绑定了的队列都能转发, 这里单拎出来一个方法是为了代码风格统一

    public boolean routeFanout(Binding binding, Message message) {return true;}

6, routeTopic() 规定主题交换机转发规则

这个方法就是用来实现判定 routingKey 和 bindingKey 是否匹配了

	public boolean routeTopic(Binding binding, Message message) {String[] bindingKeyFragments = binding.getBindingKey().split("\\.");String[] routingKeyFragments = message.getRoutingKey().split("\\.");int i = 0;int j = 0;while (i < bindingKeyFragments.length && j < routingKeyFragments.length) {// 遇到 * 只能匹配一个字符if (bindingKeyFragments[i].equals("*")) {i++;j++;continue;}// 遇到 # 就找下一个片段if (bindingKeyFragments[i].equals("#")) {i++;if (i == bindingKeyFragments.length) {return true;}// 说明#后面还有片段, 让 j 寻找这个片段int nextMatchIndex = findNextMatchIndex(bindingKeyFragments[i], j, routingKeyFragments);if (nextMatchIndex == -1) {return false;}j = nextMatchIndex;i++;j++;continue;}if (!bindingKeyFragments[i].equals(routingKeyFragments[j])) {return false;}i++;j++;}return i == bindingKeyFragments.length && j == routingKeyFragments.length;}public int findNextMatchIndex(String cur, int j, String[] routingKeyFragments) {while (j < routingKeyFragments.length) {if (routingKeyFragments[j].equals(cur)) {return j;}j++;}return -1;}

四、小结

本篇主要实现了"虚拟主机"
虚拟主机的作用是为了隔离不同业务线的数据, 本项目暂时只支持一个虚拟主机

  • 虚拟主机把硬盘(数据库+文件)和内存这两个模块的数据管理整合在一起, 并且封装了一系列核心 API, 供上层( BrokerServer )调用
  • 在server.core 包下实现了 Router 类, 这个类主要是在生产者发布消息时, 用来检查 bindingKey 和routingKey 是否合法, routingKey 和 bindingKey 是否匹配等, 为了实现服务器支持三种不同的交换机转发模式

在这里插入图片描述

这些核心 API 基本上都实现了, 还有 basicSubscribe() 这个 API 没有完全实现, 在下篇文章会介绍 ConsumerManager 类, 用来实现和消费者消费消息相关的逻辑

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/93507.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vscode 注释插件koroFileHeader

https://blog.51cto.com/u_15785499/5664323 https://blog.csdn.net/weixin_67697081/article/details/129004675

YoloV5实时推理最短的代码

YoloV5实时推理最简单代码 import cv2 import torch# 加载YOLOv5模型 model torch.hub.load(ultralytics/yolov5, yolov5s)# 使用CPU或GPU进行推理 device cuda if torch.cuda.is_available() else cpu model.to(device)# 打开摄像头&#xff08;默认摄像头&#xff09; cap…

vue pc端/手机移动端 — 下载导出当前表格页面pdf格式

一、需求&#xff1a;在手机端/pc端实现一个表格页面&#xff08;缴费单/体检报告单等&#xff09;的导出功能&#xff0c;便于用户在本地浏览打印。 二、实现&#xff1a;之前在pc端做过预览打印的功能&#xff0c;使用的是print.js之类的方法让当前页面直接唤起打印机的打印预…

【好玩的开源项目】Docker部署cook菜谱工具

【好玩的开源项目】Docker部署cook菜谱工具 一、cook菜谱工具介绍二、本地环境介绍2.1 本地环境规划2.2 本次实践介绍 三、本地环境检查3.1 检查Docker服务状态3.2 检查Docker版本 四、下载cook镜像五、部署cook菜谱工具5.1 创建cook容器5.2 查看容器状态5.3 检查容器日志 六、…

Oracle is和as 关键字学习

之前写的Oracle存储过程中都有is和as关键字&#xff1b;下面学习这二个关键字&#xff1b; Oracle中is可用于以下情况&#xff1a; 判断某个值是否为null。在Oracle中&#xff0c;null表示一个未知或不适用的值。因此&#xff0c;我们需要使用is null或is not null语句来检查某…

OpenNebula的配置与应用

学习了OpenNebula的安装之后&#xff0c;接下来就是配置OpenNebula&#xff0c;内容包括配置Sunstone&#xff0c;VDC和集群&#xff0c;设置影像&#xff0c;模板管理&#xff0c;虚拟机管理等。OpenNebula还有大量的工作要做&#xff0c;这些工作主要来自映像、模板和虚拟机管…

Redis主从复制、哨兵、cluster集群

目录 Redis 主从复制 主从复制的作用 主从复制流程 搭建Redis 主从复制 实验环境 所有主机安装redis 修改 Redis 配置文件&#xff08;Master节点操作&#xff09; 修改 Redis 配置文件&#xff08;Slave节点操作&#xff09; 验证主从效果 Redis 哨兵模式 哨兵模式的…

算法通过村第十一关-位运算|黄金笔记|位运算压缩

文章目录 前言用4kb内存寻找重复元素总结 前言 提示&#xff1a;如果谁对你说了地狱般的话&#xff0c;就代表了他的心在地狱。你不需要相信那样的话&#xff0c;就算对方是你的父母也一样。 --高延秀《远看是蔚蓝的春天》 位运算有个很重要的作用就是能用比较小的空间存储比较…

思科:iOS和iOSXe软件存在漏洞

思科警告说,有人试图利用iOS软件和iOSXe软件中的一个安全缺陷,这些缺陷可能会让一个经过认证的远程攻击者在受影响的系统上实现远程代码执行。 中严重程度的脆弱性被追踪为 CVE-2023-20109 ,并以6.6分得分。它会影响启用Gdoi或G-Ikev2协议的软件的所有版本。 国际知名白帽黑客…

世界前沿技术发展报告2023《世界航天技术发展报告》(二)卫星技术

&#xff08;二&#xff09;卫星技术 1.概述2. 通信卫星2.1 美国太空发展局推进“国防太空体系架构”&#xff0c;持续部署“传输层”卫星2.2 美国军方在近地轨道成功演示验证星间激光通信2.3 DARPA启动“天基自适应通信节点”项目&#xff0c;为增强太空通信在轨互操作能力提供…

程序员的重复劳动陷阱

https://kb.cnblogs.com/page/627035/ 同样是一样的计算机专业毕业&#xff0c;进入职场的职位和工作都差不多&#xff0c;为何有些程序员短短几年就成长为全能选手或领域专家&#xff0c;有些程序员还在做CRUD&#xff1f; 程序员的重复劳动陷阱 不知道大家有没有这样的感觉…

2023年中国医疗传感器行业现状分析:市场国有化率低[图]

传感器是对物理刺激&#xff08;如热、光、声、压力、磁或特定的运动&#xff09;作出反应并传送产生的脉冲&#xff08;如用于测量或操作控制&#xff09;的装置。传感器一般由敏感元件、转换元件和转换电路组成。 医疗传感器分类 资料来源&#xff1a;共研产业咨询&#xff…

管道-有名管道

一、有名管道 有名管道与匿名管道的不同&#xff1a; 有名管道提供了一个路径名&#xff0c;并以FIFO的文件形式存在于文件系统中。与匿名管道不同&#xff0c;有名管道可以被不相关的进程使用&#xff0c;只要它们可以访问该路径&#xff0c;就能够通过有名管道进行通信。 FI…

基于SSM的学生事务处理系统设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

MySQL 索引优化实践(单表)

目录 一、前言二、表数据准备三、常见业务无索引查询耗时测试3.1、通过订单ID / 订单编号 查询指定订单3.2、查询订单列表 四、订单常见业务索引优化实践4.1、通过唯一索引和普通索引优化通过订单编号查询订单信息4.2、通过普通联合索引优化订单列表查询4.2.1、分析查询字段的查…

GROMACS Tutorial 5: Protein-Ligand Complex 中文实战教程

GROMACS Tutorial 5: Protein-Ligand Complex 中文实战教程 前言系统环境特别强调一、预处理阶段1.1 蛋白质配体分离以及除水操作1.2 选择力场识别JZ4配体1.2.1 使用在线力场解析1.2.2 使用官方推荐力场CHARMM36解析 1.3 蛋白的top文件准备1.4 配体的top文件准备1.5 使用CgenFF…

【C++】模板初阶 -- 详解

一、泛型编程 // 实现一个通用的交换函数&#xff1a; void Swap(int& left, int& right) {int temp left;left right;right temp; }void Swap(double& left, double& right) {double temp left;left right;right temp; }void Swap(char& left, ch…

高層建築設計和建造:從避難層到設備間和防風防火防水的設計理念,酒店住宅辦公樓都有什麽房間(精簡)

樓層概覽 標準層居住、辦公、商業等功能的樓層。結構和裝修與其他樓層相同&#xff0c;可供人正常居住、工作和活動避難層專門用於人員避難的樓層&#xff0c;通常會相隔數十個標準層&#xff0c;樓梯通常和標準層是錯開的(非公用)&#xff0c;具有更多的通風口。牆體和樓板具…

【Linux】TCP的服务端 + 客户端

文章目录 &#x1f4d6; 前言1. 服务端基本结构1.1 类成员变量&#xff1a;1.2 头文件1.3 初始化&#xff1a;1.3 - 1 全双工与半双工1.3 - 2 inet_aton1.3 - 3 listen 2. 服务端运行接口2.1 accept&#xff1a;2.2 服务接口&#xff1a; 3. 客户端3.1 connect&#xff1a;3.2 …

linux 笔记 安装 anaconda

1 找到anaconda 安装包 Free Download | Anaconda 2 在linux环境中安装对应安装包 3 安装完毕后查看是否安装好 发现不行&#xff0c;需要配置环境变量 4 配置环境变量 vim /etc/profile使用这个&#xff0c;发现对应的文件是只读文件 sudo vim /etc/profile在最下面加一…